Faster email deletion, Junk/Trash folder auto-expunge and changelog auto-expiry (closes #403)

This commit is contained in:
mdecimus 2024-05-19 12:15:35 +02:00
parent 21137080f8
commit 66cc0a3072
29 changed files with 1548 additions and 664 deletions

124
Cargo.lock generated
View file

@ -178,6 +178,15 @@ version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3"
[[package]]
name = "arbitrary"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d5a26814d8dcb93b0e5a0ff3c6d80a8843bafb21b39e8e18a6f05471870e110"
dependencies = [
"derive_arbitrary",
]
[[package]]
name = "arc-swap"
version = "1.7.1"
@ -1052,7 +1061,7 @@ dependencies = [
"utils",
"whatlang",
"x509-parser 0.16.0",
"zip",
"zip 0.6.6",
]
[[package]]
@ -1117,6 +1126,21 @@ dependencies = [
"libc",
]
[[package]]
name = "crc"
version = "3.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636"
dependencies = [
"crc-catalog",
]
[[package]]
name = "crc-catalog"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5"
[[package]]
name = "crc16"
version = "0.4.0"
@ -1427,6 +1451,12 @@ dependencies = [
"regex",
]
[[package]]
name = "deflate64"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83ace6c86376be0b6cdcf3fb41882e81d94b31587573d1cfa9d01cd06bba210d"
[[package]]
name = "der"
version = "0.7.9"
@ -1476,6 +1506,17 @@ dependencies = [
"serde",
]
[[package]]
name = "derive_arbitrary"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.63",
]
[[package]]
name = "des"
version = "0.8.1"
@ -1907,6 +1948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f54427cfd1c7829e2a139fcefea601bf088ebca651d2bf53ebc600eac295dae"
dependencies = [
"crc32fast",
"libz-ng-sys",
"libz-sys",
"miniz_oxide",
]
@ -3196,6 +3238,16 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "libz-ng-sys"
version = "1.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6409efc61b12687963e602df8ecf70e8ddacf95bc6576bcf16e3ac6328083c5"
dependencies = [
"cmake",
"libc",
]
[[package]]
name = "libz-sys"
version = "1.1.16"
@ -3270,10 +3322,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
[[package]]
name = "mail-auth"
version = "0.3.11"
name = "lzma-rs"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e9759ecef5c0d048464fee80947ca5ef25faff98add10ea8787a6e195b8dc5f"
checksum = "297e814c836ae64db86b36cf2a557ba54368d03f6afcd7d947c266692f71115e"
dependencies = [
"byteorder",
"crc",
]
[[package]]
name = "mail-auth"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40cffb3edf88628478c089488dd03c5c62a3af613d601efdfc3c18ced1d737ca"
dependencies = [
"ahash 0.8.11",
"flate2",
@ -3289,7 +3351,7 @@ dependencies = [
"rustls-pemfile 2.1.2",
"serde",
"serde_json",
"zip",
"zip 1.3.0",
]
[[package]]
@ -5636,6 +5698,12 @@ dependencies = [
"rand_core",
]
[[package]]
name = "simd-adler32"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe"
[[package]]
name = "simdutf8"
version = "0.1.4"
@ -6529,6 +6597,12 @@ dependencies = [
"static_assertions",
]
[[package]]
name = "typed-arena"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6af6ae20167a9ece4bcb41af5b80f8a1f1df981f6391189ce00fd257af04126a"
[[package]]
name = "typenum"
version = "1.17.0"
@ -7285,6 +7359,46 @@ dependencies = [
"zstd 0.11.2+zstd.1.5.2",
]
[[package]]
name = "zip"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1f4a27345eb6f7aa7bd015ba7eb4175fa4e1b462a29874b779e0bbcf96c6ac7"
dependencies = [
"aes",
"arbitrary",
"bzip2",
"constant_time_eq 0.3.0",
"crc32fast",
"crossbeam-utils",
"deflate64",
"displaydoc",
"flate2",
"hmac 0.12.1",
"indexmap 2.2.6",
"lzma-rs",
"pbkdf2 0.12.2",
"rand",
"sha1",
"thiserror",
"time",
"zeroize",
"zopfli",
"zstd 0.13.1",
]
[[package]]
name = "zopfli"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c1f48f3508a3a3f2faee01629564400bc12260f6214a056d06a3aaaa6ef0736"
dependencies = [
"crc32fast",
"log",
"simd-adler32",
"typed-arena",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View file

@ -10,9 +10,11 @@ use utils::config::{cron::SimpleCron, utils::ParseValue, Config, Rate};
pub struct JmapConfig {
pub default_language: Language,
pub query_max_results: usize,
pub changes_max_results: usize,
pub snippet_max_results: usize,
pub changes_max_results: usize,
pub changes_max_history: Option<Duration>,
pub request_max_size: usize,
pub request_max_calls: usize,
pub request_max_concurrent: u64,
@ -32,6 +34,7 @@ pub struct JmapConfig {
pub mail_attachments_max_size: usize,
pub mail_parse_max_items: usize,
pub mail_max_size: usize,
pub mail_autoexpunge_after: Option<Duration>,
pub sieve_max_script_name: usize,
pub sieve_max_scripts: usize,
@ -76,6 +79,7 @@ pub struct JmapConfig {
pub capabilities: BaseCapabilities,
pub session_purge_frequency: SimpleCron,
pub account_purge_frequency: SimpleCron,
}
impl JmapConfig {
@ -146,6 +150,9 @@ impl JmapConfig {
changes_max_results: config
.property("jmap.protocol.changes.max-results")
.unwrap_or(5000),
changes_max_history: config
.property_or_default::<Option<Duration>>("jmap.protocol.changes.max-history", "30d")
.unwrap_or_default(),
snippet_max_results: config
.property("jmap.protocol.search-snippet.max-results")
.unwrap_or(100),
@ -189,6 +196,9 @@ impl JmapConfig {
.unwrap_or(50000000),
mail_max_size: config.property("jmap.email.max-size").unwrap_or(75000000),
mail_parse_max_items: config.property("jmap.email.parse.max-items").unwrap_or(10),
mail_autoexpunge_after: config
.property_or_default::<Option<Duration>>("jmap.email.auto-expunge", "30d")
.unwrap_or_default(),
sieve_max_script_name: config
.property("sieve.untrusted.limits.name-length")
.unwrap_or(512),
@ -301,6 +311,9 @@ impl JmapConfig {
session_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.session.purge.frequency", "15 * *")
.unwrap_or_else(|| SimpleCron::parse_value("15 * *").unwrap()),
account_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.account.purge.frequency", "0 0 *")
.unwrap_or_else(|| SimpleCron::parse_value("15 * *").unwrap()),
fallback_admin: config
.value("authentication.fallback-admin.user")
.and_then(|u| {

View file

@ -162,7 +162,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
if account_id != u32::MAX && document_id != u32::MAX {
if reader.version == 1 && collection == email_collection {
batch.set(
ValueClass::FtsQueue(FtsQueueClass::Insert {
ValueClass::FtsQueue(FtsQueueClass {
seq,
hash: hash.clone(),
}),

View file

@ -38,7 +38,10 @@ use jmap_proto::{
type_state::DataType,
},
};
use store::write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE};
use store::{
roaring::RoaringBitmap,
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE},
};
impl<T: SessionStream> Session<T> {
pub async fn handle_copy_move(
@ -286,6 +289,7 @@ impl<T: SessionStream> SessionData<T> {
.with_code(ResponseCode::ContactAdmin)
})?
.quota as i64;
let mut destroy_ids = RoaringBitmap::new();
for (id, imap_id) in ids {
match self
.jmap
@ -323,77 +327,23 @@ impl<T: SessionStream> SessionData<T> {
};
if is_move {
// Obtain mailbox tags
let (mut mailboxes, thread_id) = if let Some(result) = self
.get_mailbox_tags(src_account_id, id)
.await
.map_err(|_| StatusResponse::database_failure().with_tag(&arguments.tag))?
{
result
} else {
continue;
};
// Make sure the message is still in the mailbox
let src_mailbox_id = UidMailbox::new_unassigned(src_mailbox.id.mailbox_id);
if !mailboxes.current().contains(&src_mailbox_id) {
continue;
} else if mailboxes.current().len() == 1 {
// Delete message if it is no longer in any mailbox
if let Ok(changes) = self
.jmap
.email_delete(src_account_id, id)
.await
.map_err(|_| {
StatusResponse::database_failure().with_tag(&arguments.tag)
})?
{
did_move = true;
changelog.merge(changes);
}
} else {
// Remove mailbox tag from message
let mut batch = BatchBuilder::new();
batch
.with_account_id(src_account_id)
.with_collection(Collection::Email)
.update_document(id);
mailboxes.update(src_mailbox_id, false);
mailboxes.update_batch(&mut batch, Property::MailboxIds);
if changelog.change_id == u64::MAX {
changelog.change_id = self
.jmap
.assign_change_id(src_account_id)
.await
.map_err(|_| {
StatusResponse::database_failure().with_tag(&arguments.tag)
})?
}
batch.value(Property::Cid, changelog.change_id, F_VALUE);
match self.jmap.write_batch(batch).await {
Ok(_) => {
changelog
.log_update(Collection::Email, Id::from_parts(thread_id, id));
changelog.log_child_update(
Collection::Mailbox,
src_mailbox_id.mailbox_id,
);
did_move = true;
}
Err(MethodError::ServerUnavailable) => {
response.rtype = ResponseType::No;
response.message = "Some messages could not be moved.".into();
}
Err(_) => {
return Err(
StatusResponse::database_failure().with_tag(&arguments.tag)
);
}
}
}
destroy_ids.insert(id);
}
}
// Untag or delete emails
if !destroy_ids.is_empty() {
self.email_untag_or_delete(
src_account_id,
src_mailbox.id.mailbox_id,
destroy_ids,
&mut changelog,
)
.await
.map_err(|err| err.with_tag(&arguments.tag))?;
did_move = true;
}
// Broadcast changes on destination account
if let Some(change_id) = dest_change_id {
self.jmap

View file

@ -40,7 +40,10 @@ use jmap_proto::{
state::StateChange, type_state::DataType,
},
};
use store::write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE};
use store::{
roaring::RoaringBitmap,
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_VALUE},
};
use super::ToModSeq;
@ -137,7 +140,7 @@ impl<T: SessionStream> SessionData<T> {
) -> crate::op::Result<()> {
// Obtain message ids
let account_id = mailbox.id.account_id;
let deleted_ids = self
let mut deleted_ids = self
.jmap
.get_tag(
account_id,
@ -158,77 +161,20 @@ impl<T: SessionStream> SessionData<T> {
.await?
.unwrap_or_default();
// Filter by sequence
if let Some(sequence) = &sequence {
deleted_ids &= RoaringBitmap::from_iter(sequence.keys());
}
// Delete ids
let mut changelog = ChangeLogBuilder::new();
for id in deleted_ids {
if sequence
.as_ref()
.map_or(false, |ids| !ids.contains_key(&id))
{
continue;
}
// If the message is present in multiple mailboxes, untag it from this mailbox.
let mailbox_id = mailbox.id.mailbox_id;
let (mut mailboxes, thread_id) =
if let Some(result) = self.get_mailbox_tags(account_id, id).await? {
result
} else {
continue;
};
let mailbox_id = UidMailbox::new_unassigned(mailbox_id);
if !mailboxes.current().contains(&mailbox_id) {
continue;
} else if mailboxes.current().len() > 1 {
// Remove deleted flag
let mut keywords = if let Some(keywords) = self
.jmap
.get_property::<HashedValue<Vec<Keyword>>>(
account_id,
Collection::Email,
id,
Property::Keywords,
)
.await?
{
TagManager::new(keywords)
} else {
continue;
};
// Untag message from this mailbox and remove Deleted flag
mailboxes.update(mailbox_id, false);
keywords.update(Keyword::Deleted, false);
// Write changes
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(id);
mailboxes.update_batch(&mut batch, Property::MailboxIds);
keywords.update_batch(&mut batch, Property::Keywords);
if changelog.change_id == u64::MAX {
changelog.change_id = self.jmap.assign_change_id(account_id).await?
}
batch.value(Property::Cid, changelog.change_id, F_VALUE);
match self.jmap.write_batch(batch).await {
Ok(_) => {
changelog.log_update(Collection::Email, Id::from_parts(thread_id, id));
changelog.log_child_update(Collection::Mailbox, mailbox_id.mailbox_id);
}
Err(MethodError::ServerUnavailable) => {}
Err(_) => {
return Err(StatusResponse::database_failure());
}
}
} else {
// Delete message from all mailboxes
if let Ok(changes) = self.jmap.email_delete(account_id, id).await? {
changelog.merge(changes);
}
}
}
self.email_untag_or_delete(
account_id,
mailbox.id.mailbox_id,
deleted_ids,
&mut changelog,
)
.await?;
// Write changes on source account
if !changelog.is_empty() {
@ -245,4 +191,93 @@ impl<T: SessionStream> SessionData<T> {
Ok(())
}
pub async fn email_untag_or_delete(
&self,
account_id: u32,
mailbox_id: u32,
deleted_ids: RoaringBitmap,
changelog: &mut ChangeLogBuilder,
) -> crate::op::Result<()> {
let mailbox_id = UidMailbox::new_unassigned(mailbox_id);
let mut destroy_ids = RoaringBitmap::new();
for (id, mailbox_ids) in self
.jmap
.get_properties::<HashedValue<Vec<UidMailbox>>, _, _>(
account_id,
Collection::Email,
&deleted_ids,
Property::MailboxIds,
)
.await?
{
let mut mailboxes = TagManager::new(mailbox_ids);
if mailboxes.current().contains(&mailbox_id) {
if mailboxes.current().len() > 1 {
// Remove deleted flag
let (mut keywords, thread_id) = if let (Some(keywords), Some(thread_id)) = (
self.jmap
.get_property::<HashedValue<Vec<Keyword>>>(
account_id,
Collection::Email,
id,
Property::Keywords,
)
.await?,
self.jmap
.get_property::<u32>(
account_id,
Collection::Email,
id,
Property::ThreadId,
)
.await?,
) {
(TagManager::new(keywords), thread_id)
} else {
continue;
};
// Untag message from this mailbox and remove Deleted flag
mailboxes.update(mailbox_id, false);
keywords.update(Keyword::Deleted, false);
// Write changes
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(id);
mailboxes.update_batch(&mut batch, Property::MailboxIds);
keywords.update_batch(&mut batch, Property::Keywords);
if changelog.change_id == u64::MAX {
changelog.change_id = self.jmap.assign_change_id(account_id).await?
}
batch.value(Property::Cid, changelog.change_id, F_VALUE);
match self.jmap.write_batch(batch).await {
Ok(_) => {
changelog.log_update(Collection::Email, Id::from_parts(thread_id, id));
changelog.log_child_update(Collection::Mailbox, mailbox_id.mailbox_id);
}
Err(MethodError::ServerUnavailable) => {}
Err(_) => {
return Err(StatusResponse::database_failure());
}
}
} else {
destroy_ids.insert(id);
}
}
}
if !destroy_ids.is_empty() {
// Delete message from all mailboxes
let (changes, _) = self.jmap.emails_tombstone(account_id, destroy_ids).await?;
changelog.merge(changes);
}
Ok(())
}
}

View file

@ -30,6 +30,7 @@ use utils::url_params::UrlParams;
use crate::{
api::{http::ToHttpResponse, HttpRequest, HttpResponse, JsonResponse},
services::housekeeper::{Event, PurgeType},
JMAP,
};
@ -37,8 +38,13 @@ use super::decode_path_element;
impl JMAP {
pub async fn handle_manage_store(&self, req: &HttpRequest, path: Vec<&str>) -> HttpResponse {
match (path.get(1).copied(), path.get(2).copied(), req.method()) {
(Some("blobs"), Some(blob_hash), &Method::GET) => {
match (
path.get(1).copied(),
path.get(2).copied(),
path.get(3).copied(),
req.method(),
) {
(Some("blobs"), Some(blob_hash), _, &Method::GET) => {
match URL_SAFE_NO_PAD.decode(decode_path_element(blob_hash).as_bytes()) {
Ok(blob_hash) => {
match self
@ -75,25 +81,69 @@ impl JMAP {
Err(_) => RequestError::invalid_parameters().into_http_response(),
}
}
(Some("maintenance"), _, &Method::GET) => {
match self
.core
.storage
.data
.purge_blobs(self.core.storage.blob.clone())
(Some("purge"), Some("blob"), _, &Method::GET) => {
self.housekeeper_request(Event::Purge(PurgeType::Blobs {
store: self.core.storage.data.clone(),
blob_store: self.core.storage.blob.clone(),
}))
.await
}
(Some("purge"), Some("data"), id, &Method::GET) => {
let store = if let Some(id) = id {
if let Some(store) = self.core.storage.stores.get(id) {
store.clone()
} else {
return RequestError::not_found().into_http_response();
}
} else {
self.core.storage.data.clone()
};
self.housekeeper_request(Event::Purge(PurgeType::Data(store)))
.await
}
(Some("purge"), Some("lookup"), id, &Method::GET) => {
let store = if let Some(id) = id {
if let Some(store) = self.core.storage.lookups.get(id) {
store.clone()
} else {
return RequestError::not_found().into_http_response();
}
} else {
self.core.storage.lookup.clone()
};
self.housekeeper_request(Event::Purge(PurgeType::Lookup(store)))
.await
}
(Some("purge"), Some("account"), id, &Method::GET) => {
let account_id = if let Some(id) = id {
if let Ok(account_id) = id.parse::<u32>() {
account_id.into()
} else {
return RequestError::invalid_parameters().into_http_response();
}
} else {
None
};
self.housekeeper_request(Event::Purge(PurgeType::Account(account_id)))
.await
{
Ok(_) => match self.core.storage.data.purge_store().await {
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
.into_http_response(),
Err(err) => err.into_http_response(),
},
Err(err) => err.into_http_response(),
}
}
_ => RequestError::not_found().into_http_response(),
}
}
async fn housekeeper_request(&self, event: Event) -> HttpResponse {
match self.inner.housekeeper_tx.send(event).await {
Ok(_) => JsonResponse::new(json!({
"data": (),
}))
.into_http_response(),
Err(_) => {
tracing::error!("Failed to send housekeeper event");
RequestError::internal_server_error().into_http_response()
}
}
}
}

View file

@ -21,8 +21,13 @@
* for more details.
*/
use jmap_proto::error::method::MethodError;
use store::write::{log::ChangeLogBuilder, BatchBuilder};
use std::time::Duration;
use jmap_proto::{error::method::MethodError, types::collection::Collection};
use store::{
write::{log::ChangeLogBuilder, BatchBuilder},
LogKey,
};
use crate::JMAP;
@ -35,17 +40,6 @@ impl JMAP {
pub async fn assign_change_id(&self, _: u32) -> Result<u64, MethodError> {
self.generate_snowflake_id()
/*self.core.storage.data
.assign_change_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "change_log",
error = ?err,
"Failed to assign changeId.");
MethodError::ServerPartialFail
})*/
}
pub fn generate_snowflake_id(&self) -> Result<u64, MethodError> {
@ -87,4 +81,37 @@ impl JMAP {
Ok(state)
}
pub async fn delete_changes(&self, account_id: u32, before: Duration) -> store::Result<()> {
let reference_cid = self.inner.snowflake_id.past_id(before).ok_or_else(|| {
store::Error::InternalError("Failed to generate reference change id.".to_string())
})?;
for collection in [
Collection::Email,
Collection::Mailbox,
Collection::Thread,
Collection::Identity,
Collection::EmailSubmission,
] {
self.core
.storage
.data
.delete_range(
LogKey {
account_id,
collection: collection.into(),
change_id: 0,
},
LogKey {
account_id,
collection: collection.into(),
change_id: reference_cid,
},
)
.await?;
}
Ok(())
}
}

View file

@ -417,7 +417,7 @@ impl JMAP {
.value(Property::Keywords, keywords, F_VALUE | F_BITMAP)
.value(Property::Cid, change_id, F_VALUE)
.set(
ValueClass::FtsQueue(FtsQueueClass::Insert {
ValueClass::FtsQueue(FtsQueueClass {
seq: self.generate_snowflake_id()?,
hash: metadata.blob_hash.clone(),
}),

View file

@ -0,0 +1,519 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::time::Duration;
use jmap_proto::{
error::method::MethodError,
types::{
collection::Collection, id::Id, keyword::Keyword, property::Property, state::StateChange,
type_state::DataType,
},
};
use store::{
ahash::AHashMap,
roaring::RoaringBitmap,
write::{
log::ChangeLogBuilder, BatchBuilder, Bincode, BitmapClass, MaybeDynamicId, TagValue,
ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
BitmapKey, IterateParams, ValueKey, U32_LEN,
};
use utils::codec::leb128::Leb128Reader;
use crate::{
mailbox::{UidMailbox, JUNK_ID, TOMBSTONE_ID, TRASH_ID},
JMAP,
};
use super::{index::EmailIndexBuilder, metadata::MessageMetadata};
use rand::prelude::SliceRandom;
impl JMAP {
pub async fn emails_tombstone(
&self,
account_id: u32,
mut document_ids: RoaringBitmap,
) -> Result<(ChangeLogBuilder, RoaringBitmap), MethodError> {
// Create batch
let mut changes = ChangeLogBuilder::with_change_id(0);
let mut delete_properties = AHashMap::new();
// Fetch mailboxes and threadIds
let mut thread_ids: AHashMap<u32, i32> = AHashMap::new();
for (document_id, mailboxes) in self
.get_properties::<Vec<UidMailbox>, _, _>(
account_id,
Collection::Email,
&document_ids,
Property::MailboxIds,
)
.await?
{
delete_properties.insert(
document_id,
DeleteProperties {
mailboxes,
thread_id: None,
},
);
}
for (document_id, thread_id) in self
.get_properties::<u32, _, _>(
account_id,
Collection::Email,
&document_ids,
Property::ThreadId,
)
.await?
{
*thread_ids.entry(thread_id).or_default() += 1;
delete_properties
.entry(document_id)
.or_insert_with(DeleteProperties::default)
.thread_id = Some(thread_id);
}
// Obtain all threadIds
self.core
.storage
.data
.iterate(
IterateParams::new(
BitmapKey {
account_id,
collection: Collection::Email.into(),
class: BitmapClass::Tag {
field: Property::ThreadId.into(),
value: TagValue::Id(0),
},
document_id: 0,
},
BitmapKey {
account_id,
collection: Collection::Email.into(),
class: BitmapClass::Tag {
field: Property::ThreadId.into(),
value: TagValue::Id(u32::MAX),
},
document_id: u32::MAX,
},
)
.no_values(),
|key, _| {
let (thread_id, _) = key
.get(U32_LEN + 2..)
.and_then(|bytes| bytes.read_leb128::<u32>())
.ok_or_else(|| {
store::Error::InternalError("Failed to read threadId.".to_string())
})?;
if let Some(thread_count) = thread_ids.get_mut(&thread_id) {
*thread_count -= 1;
}
Ok(true)
},
)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to iterate threadIds."
);
MethodError::ServerPartialFail
})?;
// Tombstone message and untag it from the mailboxes
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
for (document_id, delete_properties) in delete_properties {
batch.update_document(document_id);
if !delete_properties.mailboxes.is_empty() {
for mailbox_id in &delete_properties.mailboxes {
debug_assert!(mailbox_id.uid != 0);
changes.log_child_update(Collection::Mailbox, mailbox_id.mailbox_id);
}
batch.value(
Property::MailboxIds,
delete_properties.mailboxes,
F_VALUE | F_BITMAP | F_CLEAR,
);
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch mailboxIds.",
);
}
if let Some(thread_id) = delete_properties.thread_id {
batch.value(Property::ThreadId, thread_id, F_VALUE | F_BITMAP | F_CLEAR);
// Log message deletion
changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id));
// Log thread changes
if thread_ids[&thread_id] < 0 {
changes.log_child_update(Collection::Thread, thread_id);
}
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch threadId.",
);
}
batch.tag(
Property::MailboxIds,
TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)),
0,
);
document_ids.remove(document_id);
if batch.ops.len() >= 1000 {
self.core
.storage
.data
.write(batch.build())
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to commit batch.");
MethodError::ServerPartialFail
})?;
batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
}
}
// Delete threadIds
for (thread_id, thread_count) in thread_ids {
if thread_count == 0 {
batch
.with_collection(Collection::Thread)
.delete_document(thread_id);
changes.log_delete(Collection::Thread, thread_id);
}
}
if !batch.ops.is_empty() {
self.core
.storage
.data
.write(batch.build())
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to commit batch.");
MethodError::ServerPartialFail
})?;
}
Ok((changes, document_ids))
}
pub async fn purge_accounts(&self) {
if let Ok(Some(account_ids)) = self.get_document_ids(u32::MAX, Collection::Principal).await
{
let mut account_ids: Vec<u32> = account_ids.into_iter().collect();
// Shuffle account ids
account_ids.shuffle(&mut rand::thread_rng());
for account_id in account_ids {
self.purge_account(account_id).await;
}
}
}
pub async fn purge_account(&self, account_id: u32) {
// Lock account
match self
.core
.storage
.lookup
.counter_incr(
format!("purge:{account_id}").into_bytes(),
1,
Some(3600),
true,
)
.await
{
Ok(1) => (),
Ok(count) => {
tracing::debug!(
event = "skipped",
context = "email_purge_account",
account_id = account_id,
count,
"Account is already being purged."
);
return;
}
Err(err) => {
tracing::error!(
event = "error",
context = "email_purge_account",
account_id = account_id,
error = ?err,
"Failed to lock account."
);
return;
}
}
// Auto-expunge deleted and junk messages
if let Some(period) = self.core.jmap.mail_autoexpunge_after {
if self.emails_auto_expunge(account_id, period).await.is_err() {
tracing::error!(
event = "error",
context = "email_auto_expunge",
account_id = account_id,
"Failed to auto-expunge messages."
);
}
}
// Purge tombstoned messages
if let Err(err) = self.emails_purge_tombstoned(account_id).await {
tracing::error!(
event = "error",
context = "email_purge_tombstoned",
account_id = account_id,
error = ?err,
"Failed to purge tombstoned messages."
);
}
// Purge changelogs
if let Some(history) = self.core.jmap.changes_max_history {
if let Err(err) = self.delete_changes(account_id, history).await {
tracing::error!(
event = "error",
context = "email_purge_account",
account_id = account_id,
error = ?err,
"Failed to purge changes."
);
}
}
}
pub async fn emails_auto_expunge(
&self,
account_id: u32,
period: Duration,
) -> Result<(), MethodError> {
let deletion_candidates = self
.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
TagValue::Id(TRASH_ID),
)
.await?
.unwrap_or_default()
| self
.get_tag(
account_id,
Collection::Email,
Property::MailboxIds,
TagValue::Id(JUNK_ID),
)
.await?
.unwrap_or_default();
if deletion_candidates.is_empty() {
return Ok(());
}
let reference_cid = self.inner.snowflake_id.past_id(period).ok_or_else(|| {
tracing::error!(
event = "error",
context = "email_auto_expunge",
account_id = account_id,
"Failed to generate reference cid."
);
MethodError::ServerPartialFail
})?;
// Find messages to destroy
let mut destroy_ids = RoaringBitmap::new();
for (document_id, cid) in self
.get_properties::<u64, _, _>(
account_id,
Collection::Email,
&deletion_candidates,
Property::Cid,
)
.await?
{
if cid < reference_cid {
destroy_ids.insert(document_id);
}
}
if destroy_ids.is_empty() {
return Ok(());
}
// Tombstone messages
let (changes, _) = self.emails_tombstone(account_id, destroy_ids).await?;
// Write and broadcast changes
if !changes.is_empty() {
let change_id = self.commit_changes(account_id, changes).await?;
self.broadcast_state_change(
StateChange::new(account_id)
.with_change(DataType::Email, change_id)
.with_change(DataType::Mailbox, change_id)
.with_change(DataType::Thread, change_id),
)
.await;
}
Ok(())
}
pub async fn emails_purge_tombstoned(&self, account_id: u32) -> store::Result<()> {
// Obtain tombstoned messages
let tombstoned_ids = self
.core
.storage
.data
.get_bitmap(BitmapKey {
account_id,
collection: Collection::Email.into(),
class: BitmapClass::Tag {
field: Property::MailboxIds.into(),
value: TagValue::Id(TOMBSTONE_ID),
},
document_id: 0,
})
.await?
.unwrap_or_default();
if tombstoned_ids.is_empty() {
return Ok(());
}
// Delete full-text index
self.core
.storage
.fts
.remove(account_id, Collection::Email.into(), &tombstoned_ids)
.await?;
// Delete messages
for document_id in tombstoned_ids {
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.delete_document(document_id)
.clear(Property::Cid)
.tag(
Property::MailboxIds,
TagValue::Id(MaybeDynamicId::Static(TOMBSTONE_ID)),
F_CLEAR,
);
// Remove keywords
if let Some(keywords) = self
.core
.storage
.data
.get_value::<Vec<Keyword>>(ValueKey {
account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::Keywords.into()),
})
.await?
{
batch.value(Property::Keywords, keywords, F_VALUE | F_BITMAP | F_CLEAR);
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch keywords.",
);
}
// Remove message metadata
if let Some(metadata) = self
.core
.storage
.data
.get_value::<Bincode<MessageMetadata>>(ValueKey {
account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::BodyStructure.into()),
})
.await?
{
batch.custom(EmailIndexBuilder::clear(metadata.inner));
// Commit batch
self.core.storage.data.write(batch.build()).await?;
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch message metadata.",
);
}
}
Ok(())
}
}
#[derive(Default, Debug)]
struct DeleteProperties {
mailboxes: Vec<UidMailbox>,
thread_id: Option<u32>,
}

View file

@ -340,7 +340,7 @@ impl JMAP {
.set(Property::ThreadId, maybe_thread_id)
.tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0)
.set(
ValueClass::FtsQueue(FtsQueueClass::Insert {
ValueClass::FtsQueue(FtsQueueClass {
seq: self
.generate_snowflake_id()
.map_err(|_| IngestError::Temporary)?,

View file

@ -25,6 +25,7 @@ pub mod body;
pub mod cache;
pub mod copy;
pub mod crypto;
pub mod delete;
pub mod get;
pub mod headers;
pub mod import;

View file

@ -33,7 +33,6 @@ use jmap_proto::{
types::{
acl::Acl,
collection::Collection,
id::Id,
keyword::Keyword,
property::Property,
state::{State, StateChange},
@ -52,22 +51,19 @@ use mail_builder::{
use mail_parser::MessageParser;
use store::{
ahash::AHashSet,
roaring::RoaringBitmap,
write::{
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, Bincode, DeserializeFrom,
FtsQueueClass, SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, DeserializeFrom, SerializeInto,
ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
Serialize,
};
use crate::{
auth::AccessToken, mailbox::UidMailbox, services::housekeeper::Event, IngestError, JMAP,
};
use crate::{auth::AccessToken, mailbox::UidMailbox, IngestError, JMAP};
use super::{
headers::{BuildHeader, ValueToHeader},
index::EmailIndexBuilder,
ingest::IngestEmail,
metadata::MessageMetadata,
};
impl JMAP {
@ -1025,21 +1021,15 @@ impl JMAP {
} else {
None
};
let mut destroy_ids = RoaringBitmap::new();
for destroy_id in will_destroy {
let document_id = destroy_id.document_id();
if email_ids.contains(document_id) {
if !matches!(&can_destroy_message_ids, Some(ids) if !ids.contains(document_id))
{
match self.email_delete(account_id, document_id).await? {
Ok(change) => {
changes.merge(change);
response.destroyed.push(destroy_id);
}
Err(err) => {
response.not_destroyed.append(destroy_id, err);
}
}
destroy_ids.insert(document_id);
response.destroyed.push(destroy_id);
} else {
response.not_destroyed.append(
destroy_id,
@ -1053,6 +1043,32 @@ impl JMAP {
.append(destroy_id, SetError::not_found());
}
}
if !destroy_ids.is_empty() {
// Batch delete (tombstone) messages
let (change, not_destroyed) =
self.emails_tombstone(account_id, destroy_ids).await?;
// Merge changes
changes.merge(change);
// Mark messages that were not found as not destroyed (this should not occur in practice)
if !not_destroyed.is_empty() {
let mut destroyed = Vec::with_capacity(response.destroyed.len());
for destroy_id in response.destroyed {
if not_destroyed.contains(destroy_id.document_id()) {
response
.not_destroyed
.append(destroy_id, SetError::not_found());
} else {
destroyed.push(destroy_id);
}
}
response.destroyed = destroyed;
}
}
}
// Update state
@ -1075,198 +1091,7 @@ impl JMAP {
Ok(response)
}
pub async fn email_delete(
&self,
account_id: u32,
document_id: u32,
) -> Result<Result<ChangeLogBuilder, SetError>, MethodError> {
// Create batch
let mut batch = BatchBuilder::new();
let mut changes = ChangeLogBuilder::with_change_id(0);
// Delete document
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.delete_document(document_id)
.set(
ValueClass::FtsQueue(FtsQueueClass::Delete {
seq: self.generate_snowflake_id()?,
}),
0u64.serialize(),
);
// Remove last changeId
batch.value(Property::Cid, (), F_VALUE | F_CLEAR);
// Remove mailboxes
let mailboxes = if let Some(mailboxes) = self
.get_property::<HashedValue<Vec<UidMailbox>>>(
account_id,
Collection::Email,
document_id,
Property::MailboxIds,
)
.await?
{
mailboxes
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch mailboxIds.",
);
return Ok(Err(SetError::not_found()));
};
for mailbox_id in &mailboxes.inner {
debug_assert!(mailbox_id.uid != 0);
changes.log_child_update(Collection::Mailbox, mailbox_id.mailbox_id);
}
batch.assert_value(Property::MailboxIds, &mailboxes).value(
Property::MailboxIds,
mailboxes.inner,
F_VALUE | F_BITMAP | F_CLEAR,
);
// Remove keywords
if let Some(keywords) = self
.get_property::<HashedValue<Vec<Keyword>>>(
account_id,
Collection::Email,
document_id,
Property::Keywords,
)
.await?
{
batch.assert_value(Property::Keywords, &keywords).value(
Property::Keywords,
keywords.inner,
F_VALUE | F_BITMAP | F_CLEAR,
);
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch keywords.",
);
return Ok(Err(SetError::not_found()));
};
// Remove threadIds
let mut delete_thread_id = None;
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
Collection::Email,
document_id,
Property::ThreadId,
)
.await?
{
// Obtain all documentIds in thread
if let Some(thread_tags) = self
.get_tag(account_id, Collection::Email, Property::ThreadId, thread_id)
.await?
{
if thread_tags.len() > 1 {
// Thread has other document ids, remove this one
changes.log_child_update(Collection::Thread, thread_id);
} else {
// Thread is empty, delete it
delete_thread_id = thread_id.into();
changes.log_delete(Collection::Thread, thread_id);
}
// Remove threadId value and tag
batch.assert_value(Property::ThreadId, thread_id).value(
Property::ThreadId,
thread_id,
F_VALUE | F_BITMAP | F_CLEAR,
);
// Log message deletion
changes.log_delete(Collection::Email, Id::from_parts(thread_id, document_id));
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
thread_id = thread_id,
document_id = document_id,
"Failed to fetch thread tags.",
);
return Ok(Err(SetError::not_found()));
}
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch threadId.",
);
return Ok(Err(SetError::not_found()));
}
// Remove message metadata
if let Some(metadata) = self
.get_property::<Bincode<MessageMetadata>>(
account_id,
Collection::Email,
document_id,
Property::BodyStructure,
)
.await?
{
batch.custom(EmailIndexBuilder::clear(metadata.inner));
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch message metadata.",
);
return Ok(Err(SetError::not_found()));
};
// Delete threadId
if let Some(thread_id) = delete_thread_id {
batch
.with_collection(Collection::Thread)
.delete_document(thread_id);
}
// Commit batch
match self.core.storage.data.write(batch.build()).await {
Ok(_) => (),
Err(store::Error::AssertValueFailed) => {
return Ok(Err(SetError::forbidden().with_description(
"Another process modified this message, please try again.",
)));
}
Err(err) => {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to commit batch.");
return Err(MethodError::ServerPartialFail);
}
}
// Request FTS index
let _ = self.inner.housekeeper_tx.send(Event::IndexStart).await;
Ok(Ok(changes))
}
}
pub struct TagManager<
T: PartialEq + Clone + ToBitmaps + SerializeInto + Serialize + DeserializeFrom + Sync + Send,
> {

View file

@ -49,6 +49,7 @@ use services::{
use smtp::core::SMTP;
use store::{
dispatch::DocumentSet,
fts::FtsFilter,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
roaring::RoaringBitmap,
@ -228,7 +229,7 @@ impl JMAP {
property: P,
) -> Result<Vec<(u32, U)>, MethodError>
where
I: PropertiesIterator + Send + Sync,
I: DocumentSet + Send + Sync,
P: AsRef<Property>,
U: Deserialize + 'static,
{
@ -619,47 +620,3 @@ impl UpdateResults for QueryResponse {
}
}
}
#[allow(clippy::len_without_is_empty)]
pub trait PropertiesIterator {
fn min(&self) -> u32;
fn max(&self) -> u32;
fn contains(&self, id: u32) -> bool;
fn len(&self) -> usize;
}
impl PropertiesIterator for RoaringBitmap {
fn min(&self) -> u32 {
self.min().unwrap_or(0)
}
fn max(&self) -> u32 {
self.max().map(|m| m + 1).unwrap_or(0)
}
fn contains(&self, id: u32) -> bool {
self.contains(id)
}
fn len(&self) -> usize {
self.len() as usize
}
}
impl PropertiesIterator for () {
fn min(&self) -> u32 {
0
}
fn max(&self) -> u32 {
u32::MAX
}
fn contains(&self, _: u32) -> bool {
true
}
fn len(&self) -> usize {
0
}
}

View file

@ -40,6 +40,7 @@ pub const TRASH_ID: u32 = 1;
pub const JUNK_ID: u32 = 2;
pub const DRAFTS_ID: u32 = 3;
pub const SENT_ID: u32 = 4;
pub const TOMBSTONE_ID: u32 = u32::MAX - 1;
#[derive(Debug, Clone, Copy)]
pub struct UidMailbox {

View file

@ -370,90 +370,20 @@ impl JMAP {
// If the message is in multiple mailboxes, untag it from the current mailbox,
// otherwise delete it.
for message_id in message_ids {
// Obtain mailboxIds
if let Some(mailbox_ids) = self
.get_property::<HashedValue<Vec<UidMailbox>>>(
account_id,
Collection::Email,
message_id,
Property::MailboxIds,
)
.await?
.and_then(|mut ids| {
let idx = ids.inner.iter().position(|&id| {
debug_assert!(id.uid != 0);
id.mailbox_id == document_id
})?;
ids.inner.swap_remove(idx);
Some(ids)
})
{
if !mailbox_ids.inner.is_empty() {
// Obtain threadId
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
Collection::Email,
message_id,
Property::ThreadId,
)
.await?
{
// Untag message from mailbox
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(message_id)
.assert_value(Property::MailboxIds, &mailbox_ids)
.value(Property::MailboxIds, mailbox_ids.inner, F_VALUE)
.value(Property::MailboxIds, document_id, F_BITMAP | F_CLEAR);
match self.core.storage.data.write(batch.build()).await {
Ok(_) => changes.log_update(
Collection::Email,
Id::from_parts(thread_id, message_id),
),
Err(store::Error::AssertValueFailed) => {
return Ok(Err(SetError::forbidden().with_description(
concat!(
"Another process modified a message in this mailbox ",
"while deleting it, please try again."
),
)));
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
error = ?err,
"Failed to update message while deleting mailbox.");
return Err(MethodError::ServerPartialFail);
}
}
} else {
tracing::debug!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
"Message does not have a threadId, skipping."
);
}
} else {
// Delete message
if let Ok(mut change) =
self.email_delete(account_id, message_id).await?
{
change.changes.remove(&(Collection::Mailbox as u8));
changes.merge(change);
}
}
} else {
let mut destroy_ids = RoaringBitmap::new();
for (message_id, mut mailbox_ids) in self
.get_properties::<HashedValue<Vec<UidMailbox>>, _, _>(
account_id,
Collection::Email,
&message_ids,
Property::MailboxIds,
)
.await?
{
// Remove mailbox from list
let orig_len = mailbox_ids.inner.len();
mailbox_ids.inner.retain(|id| id.mailbox_id != document_id);
if mailbox_ids.inner.len() == orig_len {
tracing::debug!(
event = "error",
context = "mailbox_set",
@ -462,7 +392,76 @@ impl JMAP {
message_id = message_id,
"Message is not in the mailbox, skipping."
);
continue;
}
if !mailbox_ids.inner.is_empty() {
// Obtain threadId
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
Collection::Email,
message_id,
Property::ThreadId,
)
.await?
{
// Untag message from mailbox
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email)
.update_document(message_id)
.assert_value(Property::MailboxIds, &mailbox_ids)
.value(Property::MailboxIds, mailbox_ids.inner, F_VALUE)
.value(Property::MailboxIds, document_id, F_BITMAP | F_CLEAR);
match self.core.storage.data.write(batch.build()).await {
Ok(_) => changes.log_update(
Collection::Email,
Id::from_parts(thread_id, message_id),
),
Err(store::Error::AssertValueFailed) => {
return Ok(Err(SetError::forbidden().with_description(
concat!(
"Another process modified a message in this mailbox ",
"while deleting it, please try again."
),
)));
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
error = ?err,
"Failed to update message while deleting mailbox.");
return Err(MethodError::ServerPartialFail);
}
}
} else {
tracing::debug!(
event = "error",
context = "mailbox_set",
account_id = account_id,
mailbox_id = document_id,
message_id = message_id,
"Message does not have a threadId, skipping."
);
}
} else {
// Delete message
destroy_ids.insert(message_id);
}
}
// Bulk delete messages
if !destroy_ids.is_empty() {
let (mut change, _) = self.emails_tombstone(account_id, destroy_ids).await?;
change.changes.remove(&(Collection::Mailbox as u8));
changes.merge(change);
}
} else {
return Ok(Err(SetError::new(SetErrorType::MailboxHasEmail)

View file

@ -26,7 +26,7 @@ use std::{
time::{Duration, Instant},
};
use store::write::purge::PurgeStore;
use store::{write::purge::PurgeStore, BlobStore, LookupStore, Store};
use tokio::sync::mpsc;
use utils::map::ttl_dashmap::TtlMap;
@ -42,11 +42,19 @@ pub enum Event {
provider_id: String,
renew_at: Instant,
},
Purge(PurgeType),
#[cfg(feature = "test_mode")]
IndexIsActive(tokio::sync::oneshot::Sender<bool>),
Exit,
}
pub enum PurgeType {
Data(Store),
Blobs { store: Store, blob_store: BlobStore },
Lookup(LookupStore),
Account(Option<u32>),
}
#[derive(PartialEq, Eq)]
struct Action {
due: Instant,
@ -56,6 +64,7 @@ struct Action {
#[derive(PartialEq, Eq, Debug)]
enum ActionClass {
Session,
Account,
Store(usize),
Acme(String),
}
@ -85,6 +94,10 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
Instant::now() + core_.jmap.session_purge_frequency.time_to_next(),
ActionClass::Session,
);
queue.schedule(
Instant::now() + core_.jmap.account_purge_frequency.time_to_next(),
ActionClass::Account,
);
for (idx, schedule) in core_.storage.purge_schedules.iter().enumerate() {
queue.schedule(
Instant::now() + schedule.cron.time_to_next(),
@ -172,6 +185,40 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
index_busy = false;
}
}
Event::Purge(purge) => match purge {
PurgeType::Data(store) => {
tokio::spawn(async move {
if let Err(err) = store.purge_store().await {
tracing::error!("Failed to purge data store: {err}",);
}
});
}
PurgeType::Blobs { store, blob_store } => {
tokio::spawn(async move {
if let Err(err) = store.purge_blobs(blob_store).await {
tracing::error!("Failed to purge blob store: {err}",);
}
});
}
PurgeType::Lookup(store) => {
tokio::spawn(async move {
if let Err(err) = store.purge_lookup_store().await {
tracing::error!("Failed to purge lookup store: {err}",);
}
});
}
PurgeType::Account(account_id) => {
let jmap = JMAP::from(core.clone());
tokio::spawn(async move {
tracing::debug!("Purging accounts.");
if let Some(account_id) = account_id {
jmap.purge_account(account_id).await;
} else {
jmap.purge_accounts().await;
}
});
}
},
#[cfg(feature = "test_mode")]
Event::IndexIsActive(tx) => {
tx.send(index_busy).ok();
@ -236,6 +283,18 @@ pub fn spawn_housekeeper(core: JmapInstance, mut rx: mpsc::Receiver<Event>) {
}
});
}
ActionClass::Account => {
let jmap = JMAP::from(core.clone());
tokio::spawn(async move {
tracing::debug!("Purging accounts.");
jmap.purge_accounts().await;
});
queue.schedule(
Instant::now()
+ core_.jmap.account_purge_frequency.time_to_next(),
ActionClass::Account,
);
}
ActionClass::Session => {
let inner = core.jmap_inner.clone();
tokio::spawn(async move {

View file

@ -23,7 +23,6 @@
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
ahash::{AHashMap, AHashSet},
fts::index::FtsDocument,
write::{
key::DeserializeBigEndian, now, BatchBuilder, Bincode, FtsQueueClass, MaybeDynamicId,
@ -47,7 +46,7 @@ struct IndexEmail {
document_id: u32,
seq: u64,
lock_expiry: u64,
insert_hash: Option<BlobHash>,
insert_hash: BlobHash,
}
const INDEX_LOCK_EXPIRY: u64 = 60 * 5;
@ -58,19 +57,23 @@ impl JMAP {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::FtsQueue(FtsQueueClass::Delete { seq: 0 }),
class: ValueClass::FtsQueue(FtsQueueClass {
seq: 0,
hash: BlobHash::default(),
}),
};
let to_key = ValueKey::<ValueClass<u32>> {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::FtsQueue(FtsQueueClass::Delete { seq: u64::MAX }),
class: ValueClass::FtsQueue(FtsQueueClass {
seq: u64::MAX,
hash: BlobHash::default(),
}),
};
// Retrieve entries pending to be indexed
let mut insert_entries = Vec::new();
let mut delete_entries: AHashMap<u32, Vec<IndexEmail>> = AHashMap::new();
let mut skipped_documents = AHashSet::new();
let mut entries = Vec::new();
let now = now();
let _ = self
.core
@ -82,26 +85,8 @@ impl JMAP {
let event = IndexEmail::deserialize(key, value)?;
if event.lock_expiry < now {
if !skipped_documents.contains(&(event.account_id, event.document_id)) {
if event.insert_hash.is_some() {
insert_entries.push(event);
} else {
delete_entries
.entry(event.account_id)
.or_default()
.push(event);
}
} else {
tracing::trace!(
context = "queue",
event = "skipped",
account_id = event.account_id,
document_id = event.document_id,
"DocumentId already locked by another process."
);
}
entries.push(event);
} else {
skipped_documents.insert((event.account_id, event.document_id));
tracing::trace!(
context = "queue",
event = "locked",
@ -126,7 +111,7 @@ impl JMAP {
});
// Add entries to the index
for event in insert_entries {
for event in entries {
// Lock index
if !self.try_lock_index(&event).await {
continue;
@ -142,8 +127,7 @@ impl JMAP {
.await
{
Ok(Some(metadata))
if metadata.inner.blob_hash.as_slice()
== event.insert_hash.as_ref().unwrap().as_slice() =>
if metadata.inner.blob_hash.as_slice() == event.insert_hash.as_slice() =>
{
// Obtain raw message
let raw_message = if let Ok(Some(raw_message)) = self
@ -240,71 +224,6 @@ impl JMAP {
}
}
// Remove entries from the index
for (account_id, events) in delete_entries {
let mut document_ids = Vec::with_capacity(events.len());
let mut locked_events = Vec::with_capacity(events.len());
for event in events {
if self.try_lock_index(&event).await {
document_ids.push(event.document_id);
locked_events.push(event);
}
}
if document_ids.is_empty() {
continue;
}
if let Err(err) = self
.core
.storage
.fts
.remove(account_id, Collection::Email.into(), &document_ids)
.await
{
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = account_id,
document_ids = ?document_ids,
reason = ?err,
"Failed to remove documents from FTS index"
);
continue;
}
// Remove entries from the queue
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
for event in locked_events {
batch
.update_document(event.document_id)
.clear(event.value_class());
}
// Remove entry from queue
if let Err(err) = self.core.storage.data.write(batch.build()).await {
tracing::error!(
context = "fts_index_queued",
event = "error",
reason = ?err,
"Failed to remove index email from queue"
);
continue;
}
tracing::debug!(
context = "fts_index_queued",
event = "delete",
account_id = account_id,
document_ids = ?document_ids,
"Deleted document from FTS index"
);
}
if let Err(err) = self.inner.housekeeper_tx.send(Event::IndexDone).await {
tracing::warn!("Failed to send index done event to housekeeper: {}", err);
}
@ -345,13 +264,10 @@ impl JMAP {
impl IndexEmail {
fn value_class(&self) -> ValueClass<MaybeDynamicId> {
match &self.insert_hash {
Some(hash) => ValueClass::FtsQueue(FtsQueueClass::Insert {
hash: hash.clone(),
seq: self.seq,
}),
None => ValueClass::FtsQueue(FtsQueueClass::Delete { seq: self.seq }),
}
ValueClass::FtsQueue(FtsQueueClass {
hash: self.insert_hash.clone(),
seq: self.seq,
})
}
fn deserialize(key: &[u8], value: &[u8]) -> store::Result<Self> {
@ -365,7 +281,8 @@ impl IndexEmail {
U64_LEN + U32_LEN + U32_LEN + 1
..U64_LEN + U32_LEN + U32_LEN + BLOB_HASH_LEN + 1,
)
.and_then(|bytes| BlobHash::try_from_hash_slice(bytes).ok()),
.and_then(|bytes| BlobHash::try_from_hash_slice(bytes).ok())
.ok_or_else(|| store::Error::InternalError("Invalid blob hash".to_string()))?,
})
}
}

View file

@ -29,6 +29,7 @@ use serde_json::json;
use crate::{
backend::elastic::INDEX_NAMES,
dispatch::DocumentSet,
fts::{index::FtsDocument, Field},
};
@ -77,8 +78,10 @@ impl ElasticSearchStore {
&self,
account_id: u32,
collection: u8,
document_ids: &[u32],
document_ids: &impl DocumentSet,
) -> crate::Result<()> {
let document_ids = document_ids.iterate().collect::<Vec<_>>();
self.index
.delete_by_query(DeleteByQueryParts::Index(&[
INDEX_NAMES[collection as usize]

View file

@ -30,6 +30,8 @@ use crate::{
FtsStore,
};
use super::DocumentSet;
impl FtsStore {
pub async fn index<T: Into<u8> + Display + Clone + std::fmt::Debug>(
&self,
@ -61,7 +63,7 @@ impl FtsStore {
&self,
account_id: u32,
collection: u8,
document_ids: &[u32],
document_ids: &impl DocumentSet,
) -> crate::Result<()> {
match self {
FtsStore::Store(store) => store.fts_remove(account_id, collection, document_ids).await,

View file

@ -21,6 +21,8 @@
* for more details.
*/
use roaring::RoaringBitmap;
use crate::Store;
pub mod blob;
@ -45,3 +47,78 @@ impl Store {
}
}
}
#[allow(clippy::len_without_is_empty)]
pub trait DocumentSet: Sync + Send {
fn min(&self) -> u32;
fn max(&self) -> u32;
fn contains(&self, id: u32) -> bool;
fn len(&self) -> usize;
fn iterate(&self) -> impl Iterator<Item = u32>;
}
impl DocumentSet for RoaringBitmap {
fn min(&self) -> u32 {
self.min().unwrap_or(0)
}
fn max(&self) -> u32 {
self.max().map(|m| m + 1).unwrap_or(0)
}
fn contains(&self, id: u32) -> bool {
self.contains(id)
}
fn len(&self) -> usize {
self.len() as usize
}
fn iterate(&self) -> impl Iterator<Item = u32> {
self.iter()
}
}
impl DocumentSet for Vec<u32> {
fn contains(&self, id: u32) -> bool {
self.binary_search(&id).is_ok()
}
fn min(&self) -> u32 {
self.first().copied().unwrap_or(0)
}
fn max(&self) -> u32 {
self.last().copied().map(|m| m + 1).unwrap_or(0)
}
fn len(&self) -> usize {
self.len()
}
fn iterate(&self) -> impl Iterator<Item = u32> {
self.iter().copied()
}
}
impl DocumentSet for () {
fn min(&self) -> u32 {
0
}
fn max(&self) -> u32 {
u32::MAX
}
fn contains(&self, _: u32) -> bool {
true
}
fn len(&self) -> usize {
0
}
fn iterate(&self) -> impl Iterator<Item = u32> {
std::iter::empty()
}
}

View file

@ -27,13 +27,16 @@ use roaring::RoaringBitmap;
use crate::{
write::{
key::KeySerializer, now, AnyKey, AssignedIds, Batch, BitmapClass, BitmapHash, ReportClass,
ValueClass,
key::{DeserializeBigEndian, KeySerializer},
now, AnyClass, AnyKey, AssignedIds, Batch, BatchBuilder, BitmapClass, BitmapHash,
Operation, ReportClass, ValueClass, ValueOp,
},
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
};
use super::DocumentSet;
#[cfg(feature = "test_mode")]
lazy_static::lazy_static! {
pub static ref BITMAPS: std::sync::Arc<parking_lot::Mutex<std::collections::HashMap<Vec<u8>, std::collections::HashSet<u32>>>> =
@ -143,7 +146,6 @@ impl Store {
pub async fn write(&self, batch: Batch) -> crate::Result<AssignedIds> {
#[cfg(feature = "test_mode")]
if std::env::var("PARANOID_WRITE").map_or(false, |v| v == "1") {
use crate::write::Operation;
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
@ -303,6 +305,80 @@ impl Store {
}
}
pub async fn delete_documents(
&self,
subspace: u8,
account_id: u32,
collection: u8,
collection_offset: Option<usize>,
document_ids: &impl DocumentSet,
) -> crate::Result<()> {
// Serialize keys
let (from_key, to_key) = if collection_offset.is_some() {
(
KeySerializer::new(U32_LEN + 2)
.write(account_id)
.write(collection),
KeySerializer::new(U32_LEN + 2)
.write(account_id)
.write(collection + 1),
)
} else {
(
KeySerializer::new(U32_LEN).write(account_id),
KeySerializer::new(U32_LEN).write(account_id + 1),
)
};
// Find keys to delete
let mut delete_keys = Vec::new();
self.iterate(
IterateParams::new(
AnyKey {
subspace,
key: from_key.finalize(),
},
AnyKey {
subspace,
key: to_key.finalize(),
},
)
.no_values(),
|key, _| {
if collection_offset.map_or(true, |offset| {
key.get(key.len() - U32_LEN - offset).copied() == Some(collection)
}) {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if document_ids.contains(document_id) {
delete_keys.push(key.to_vec());
}
}
Ok(true)
},
)
.await?;
// Remove keys
let mut batch = BatchBuilder::new();
for key in delete_keys {
if batch.ops.len() >= 1000 {
self.write(std::mem::take(&mut batch).build()).await?;
}
batch.ops.push(Operation::Value {
class: ValueClass::Any(AnyClass { subspace, key }),
op: ValueOp::Clear,
});
}
if !batch.is_empty() {
self.write(batch.build()).await?;
}
Ok(())
}
pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
for subspace in [
SUBSPACE_BITMAP_ID,
@ -466,10 +542,7 @@ impl Store {
pub async fn blob_expire_all(&self) {
use utils::{BlobHash, BLOB_HASH_LEN};
use crate::{
write::{key::DeserializeBigEndian, BatchBuilder, BlobOp, Operation, ValueOp},
U64_LEN,
};
use crate::{write::BlobOp, U64_LEN};
// Delete all temporary hashes
let from_key = ValueKey {
@ -522,9 +595,7 @@ impl Store {
#[cfg(feature = "test_mode")]
pub async fn lookup_expire_all(&self) {
use crate::write::{
key::DeserializeBigEndian, BatchBuilder, LookupClass, Operation, ValueOp,
};
use crate::write::LookupClass;
// Delete all temporary counters
let from_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![0u8])));
@ -649,31 +720,31 @@ impl Store {
match key[5] {
BM_DOCUMENT_IDS => {
eprint!("Found document ids bitmap");
print!("Found document ids bitmap");
}
BM_TAG => {
eprint!(
print!(
"Found tagged id {} bitmap",
key[7..].iter().next_leb128::<u32>().unwrap()
);
}
TAG_TEXT => {
eprint!(
print!(
"Found tagged text {:?} bitmap",
String::from_utf8_lossy(&key[7..])
);
}
TAG_STATIC => {
eprint!("Found tagged static {} bitmap", key[7]);
print!("Found tagged static {} bitmap", key[7]);
}
other => {
if other & BM_TEXT == BM_TEXT {
eprint!(
print!(
"Found text hash {:?} bitmap",
String::from_utf8_lossy(&key[7..])
);
} else {
eprint!("Found unknown bitmap");
print!("Found unknown bitmap");
}
}
}

View file

@ -35,6 +35,7 @@ use nlp::{
use crate::{
backend::MAX_TOKEN_LENGTH,
dispatch::DocumentSet,
write::{
hash::TokenType, key::DeserializeBigEndian, BatchBuilder, BitmapHash, MaybeDynamicId,
Operation, ValueClass, ValueOp,
@ -245,7 +246,7 @@ impl Store {
&self,
account_id: u32,
collection: u8,
document_ids: &[u32],
document_ids: &impl DocumentSet,
) -> crate::Result<()> {
// Find keys to delete
let mut delete_keys: AHashMap<u32, Vec<ValueClass<MaybeDynamicId>>> = AHashMap::new();
@ -273,7 +274,7 @@ impl Store {
.no_values(),
|key, _| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if document_ids.contains(&document_id) {
if document_ids.contains(document_id) {
let mut hash = [0u8; 8];
let (hash, len) = match key.len() - (U32_LEN * 2) - 1 {
9 => {

View file

@ -34,8 +34,8 @@ use crate::{
};
use super::{
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, FtsQueueClass, LookupClass,
QueueClass, ReportClass, ReportEvent, ResolveId, TagValue, ValueClass,
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass,
ReportEvent, ResolveId, TagValue, ValueClass,
};
pub struct KeySerializer {
@ -282,19 +282,12 @@ impl<T: ResolveId> ValueClass<T> {
.write(account_id)
.write(collection)
.write(document_id),
ValueClass::FtsQueue(index) => match index {
FtsQueueClass::Insert { seq, hash } => serializer
.write(*seq)
.write(account_id)
.write(collection)
.write(document_id)
.write::<&[u8]>(hash.as_ref()),
FtsQueueClass::Delete { seq } => serializer
.write(*seq)
.write(account_id)
.write(collection)
.write(document_id),
},
ValueClass::FtsQueue(queue) => serializer
.write(queue.seq)
.write(account_id)
.write(collection)
.write(document_id)
.write::<&[u8]>(queue.hash.as_ref()),
ValueClass::Blob(op) => match op {
BlobOp::Reserve { hash, until } => serializer
.write(account_id)
@ -389,6 +382,7 @@ impl<T: ResolveId> ValueClass<T> {
serializer.write(2u8).write(*expires).write(*id)
}
},
ValueClass::Any(any) => serializer.write(any.key.as_slice()),
}
.finalize()
}
@ -566,6 +560,7 @@ impl<T> ValueClass<T> {
QueueClass::QuotaCount(v) | QueueClass::QuotaSize(v) => v.len(),
},
ValueClass::Report(_) => U64_LEN * 2 + 1,
ValueClass::Any(v) => v.key.len(),
}
}
@ -606,6 +601,7 @@ impl<T> ValueClass<T> {
QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA,
},
ValueClass::Report(_) => SUBSPACE_REPORT_OUT,
ValueClass::Any(any) => any.subspace,
}
}

View file

@ -170,12 +170,19 @@ pub enum ValueClass<T> {
Config(Vec<u8>),
Queue(QueueClass),
Report(ReportClass),
Any(AnyClass),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum FtsQueueClass {
Insert { seq: u64, hash: BlobHash },
Delete { seq: u64 },
pub struct FtsQueueClass {
pub seq: u64,
pub hash: BlobHash,
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub struct AnyClass {
pub subspace: u8,
pub key: Vec<u8>,
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]

View file

@ -43,7 +43,7 @@ impl SimpleCron {
.with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0)
.earliest()
.unwrap_or_else(|| now - TimeDelta::try_seconds(1).unwrap_or_default());
if next < now {
if next <= now {
next + TimeDelta::try_days(1).unwrap_or_default()
} else {
next
@ -54,7 +54,7 @@ impl SimpleCron {
.with_ymd_and_hms(now.year(), now.month(), now.day(), *hour, *minute, 0)
.earliest()
.unwrap_or_else(|| now - TimeDelta::try_seconds(1).unwrap_or_default());
if next < now {
if next <= now {
next + TimeDelta::try_days(
(7 - now.weekday().number_from_monday() + *day).into(),
)
@ -68,7 +68,7 @@ impl SimpleCron {
.with_ymd_and_hms(now.year(), now.month(), now.day(), now.hour(), *minute, 0)
.earliest()
.unwrap_or_else(|| now - TimeDelta::try_seconds(1).unwrap_or_default());
if next < now {
if next <= now {
next + TimeDelta::try_hours(1).unwrap_or_default()
} else {
next

View file

@ -51,6 +51,16 @@ impl SnowflakeIdGenerator {
}
}
#[inline(always)]
pub fn past_id(&self, period: Duration) -> Option<u64> {
self.epoch
.elapsed()
.ok()
.and_then(|elapsed| elapsed.checked_sub(period))
.map(|elapsed| (elapsed.as_millis() as u64) << (SEQUENCE_LEN + NODE_ID_LEN))
}
#[inline(always)]
pub fn generate(&self) -> Option<u64> {
let elapsed = self.epoch.elapsed().ok()?.as_millis() as u64;
let sequence = self.sequence.fetch_add(1, Ordering::Relaxed);

View file

@ -46,7 +46,11 @@ use reqwest::header;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use smtp::core::{SmtpSessionManager, SMTP};
use store::Stores;
use store::{
roaring::RoaringBitmap,
write::{key::DeserializeBigEndian, AnyKey},
IterateParams, Stores, SUBSPACE_PROPERTY,
};
use tokio::sync::{mpsc, watch};
use utils::config::Config;
@ -69,6 +73,7 @@ pub mod email_set;
pub mod email_submission;
pub mod event_source;
pub mod mailbox;
pub mod purge;
pub mod push_subscription;
pub mod quota;
pub mod sieve_script;
@ -234,6 +239,12 @@ throttle = "500ms"
throttle = "500ms"
attempts.interval = "500ms"
[jmap.email]
auto-expunge = "1s"
[jmap.protocol.changes]
max-history = "1s"
[store."auth"]
type = "sqlite"
path = "{TMP}/auth.db"
@ -326,6 +337,7 @@ pub async fn jmap_tests() {
quota::test(&mut params).await;
crypto::test(&mut params).await;
blob::test(&mut params).await;
purge::test(&mut params).await;
if delete {
params.temp_dir.delete();
@ -390,6 +402,9 @@ pub async fn assert_is_empty(server: Arc<JMAP>) {
// Wait for pending FTS index tasks
wait_for_index(&server).await;
// Purge accounts
emails_purge_tombstoned(&server).await;
// Assert is empty
server
.core
@ -399,6 +414,38 @@ pub async fn assert_is_empty(server: Arc<JMAP>) {
.await;
}
pub async fn emails_purge_tombstoned(server: &JMAP) {
let mut account_ids = RoaringBitmap::new();
server
.core
.storage
.data
.iterate(
IterateParams::new(
AnyKey {
subspace: SUBSPACE_PROPERTY,
key: vec![0u8],
},
AnyKey {
subspace: SUBSPACE_PROPERTY,
key: vec![u8::MAX, u8::MAX, u8::MAX, u8::MAX],
},
)
.no_values(),
|key, _| {
account_ids.insert(key.deserialize_be_u32(0).unwrap());
Ok(true)
},
)
.await
.unwrap();
for account_id in account_ids {
server.emails_purge_tombstoned(account_id).await.unwrap();
}
}
async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
// Load and parse config
let temp_dir = TempDir::new("jmap_tests", delete_if_exists);

200
tests/src/jmap/purge.rs Normal file
View file

@ -0,0 +1,200 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashSet;
use jmap::{
mailbox::{INBOX_ID, JUNK_ID, TRASH_ID},
JMAP,
};
use jmap_proto::types::{collection::Collection, id::Id, property::Property};
use store::{
write::{key::DeserializeBigEndian, TagValue},
IterateParams, LogKey, U32_LEN, U64_LEN,
};
use super::JMAPTest;
pub async fn test(params: &mut JMAPTest) {
println!("Running purge tests...");
let server = params.server.clone();
let client = &mut params.client;
let inbox_id = Id::from(INBOX_ID).to_string();
let trash_id = Id::from(TRASH_ID).to_string();
let junk_id = Id::from(JUNK_ID).to_string();
// Create test messages
client.set_default_account_id(Id::from(1u64));
let mut message_ids = Vec::new();
let mut pass = 0;
let mut changes = AHashSet::new();
loop {
pass += 1;
for folder_id in [&inbox_id, &trash_id, &junk_id] {
message_ids.push(
client
.email_import(
format!(
concat!(
"From: bill@example.com\r\n",
"To: jdoe@example.com\r\n",
"Subject: TPS Report #{} {}\r\n",
"\r\n",
"I'm going to need those TPS reports ASAP. ",
"So, if you could do that, that'd be great."
),
pass, folder_id
)
.into_bytes(),
[folder_id],
None::<Vec<&str>>,
None,
)
.await
.unwrap()
.take_id(),
);
}
if pass == 1 {
changes = get_changes(&server).await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
} else {
break;
}
}
// Make sure both messages and changes are present
assert_eq!(
server
.get_document_ids(1, Collection::Email)
.await
.unwrap()
.unwrap()
.len(),
6
);
// Purge junk/trash messages and old changes
server.purge_account(1).await;
// Only 4 messages should remain
assert_eq!(
server
.get_document_ids(1, Collection::Email)
.await
.unwrap()
.unwrap()
.len(),
4
);
assert_eq!(
server
.get_tag(
1,
Collection::Email,
Property::MailboxIds,
TagValue::Id(INBOX_ID)
)
.await
.unwrap()
.unwrap()
.len(),
2
);
assert_eq!(
server
.get_tag(
1,
Collection::Email,
Property::MailboxIds,
TagValue::Id(TRASH_ID)
)
.await
.unwrap()
.unwrap()
.len(),
1
);
assert_eq!(
server
.get_tag(
1,
Collection::Email,
Property::MailboxIds,
TagValue::Id(JUNK_ID)
)
.await
.unwrap()
.unwrap()
.len(),
1
);
// Compare changes
let new_changes = get_changes(&server).await;
assert!(!changes.is_empty());
assert!(!new_changes.is_empty());
for change in changes {
assert!(
!new_changes.contains(&change),
"Change {:?} was not purged",
change
);
}
}
async fn get_changes(server: &JMAP) -> AHashSet<(u64, u8)> {
let mut changes = AHashSet::new();
server
.core
.storage
.data
.iterate(
IterateParams::new(
LogKey {
account_id: 0,
collection: 0,
change_id: 0,
},
LogKey {
account_id: u32::MAX,
collection: u8::MAX,
change_id: u64::MAX,
},
)
.ascending()
.no_values(),
|key, _| {
assert_eq!(key.deserialize_be_u32(0).unwrap(), 1);
changes.insert((
key.deserialize_be_u64(key.len() - U64_LEN).unwrap(),
key[U32_LEN],
));
Ok(true)
},
)
.await
.unwrap();
changes
}

View file

@ -22,8 +22,8 @@
*/
use crate::jmap::{
assert_is_empty, delivery::SmtpConnection, jmap_raw_request, mailbox::destroy_all_mailboxes,
test_account_login,
assert_is_empty, delivery::SmtpConnection, emails_purge_tombstoned, jmap_raw_request,
mailbox::destroy_all_mailboxes, test_account_login,
};
use directory::backend::internal::manage::ManageDirectory;
use jmap::{blob::upload::DISABLE_UPLOAD_QUOTA, mailbox::INBOX_ID};
@ -191,6 +191,7 @@ pub async fn test(params: &mut JMAPTest) {
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
emails_purge_tombstoned(&server).await;
assert_eq!(
server
.get_used_quota(account_id.document_id())
@ -238,6 +239,7 @@ pub async fn test(params: &mut JMAPTest) {
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
emails_purge_tombstoned(&server).await;
assert_eq!(
server
.get_used_quota(account_id.document_id())
@ -300,6 +302,7 @@ pub async fn test(params: &mut JMAPTest) {
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
emails_purge_tombstoned(&server).await;
assert_eq!(
server
.get_used_quota(account_id.document_id())