diff --git a/Cargo.lock b/Cargo.lock index 201a47a7..f15775a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4308,6 +4308,7 @@ dependencies = [ "jmap", "jmap_proto", "managesieve", + "migration", "pop3", "services", "smtp", @@ -4426,6 +4427,32 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c797b9d6bb23aab2fc369c65f871be49214f5c759af65bde26ffaaa2b646b492" +[[package]] +name = "migration" +version = "0.11.7" +dependencies = [ + "base64 0.22.1", + "bincode 1.3.3", + "common", + "compact_str", + "directory", + "email", + "jmap_proto", + "lz4_flex", + "mail-auth", + "mail-parser 0.11.0", + "nlp", + "rkyv", + "serde", + "serde_json", + "sieve-rs", + "smtp", + "store", + "tokio", + "trc", + "utils", +] + [[package]] name = "mime" version = "0.3.17" @@ -7090,6 +7117,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7d3950ab75b03c52f2f13fd52aab91c9d62698b231b67240e85c3ef5301e63e" dependencies = [ "rkyv", + "serde", ] [[package]] @@ -7457,6 +7485,7 @@ dependencies = [ "mail-parser 0.11.0", "mail-send", "managesieve", + "migration", "nlp", "num_cpus", "pop3", diff --git a/Cargo.toml b/Cargo.toml index 1e4cad1c..dd77cf83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ members = [ "crates/utils", "crates/common", "crates/trc", + "crates/migration", "crates/cli", "tests", ] diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 5f6b3fd5..ce4b41b8 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -38,7 +38,6 @@ use std::{ sync::{Arc, atomic::AtomicBool}, time::Duration, }; -use store::roaring::RoaringBitmap; use tinyvec::TinyVec; use tokio::sync::{Notify, Semaphore, mpsc}; use tokio_rustls::TlsConnector; @@ -72,6 +71,8 @@ pub static USER_AGENT: &str = "Stalwart/1.0.0"; pub static DAEMON_NAME: &str = concat!("Stalwart v", env!("CARGO_PKG_VERSION"),); pub static PROD_ID: &str = "-//Stalwart Labs Ltd.//Stalwart Server//EN"; +pub const DATABASE_SCHEMA_VERSION: u32 = 1; + pub const LONG_1D_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24); pub const LONG_1Y_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); @@ -777,41 +778,6 @@ impl DavName { } } -impl MessageStoreCache { - pub fn assign_thread_id(&self, thread_name: &[u8], message_id: &[u8]) -> u32 { - let mut bytes = Vec::with_capacity(thread_name.len() + message_id.len()); - bytes.extend_from_slice(thread_name); - bytes.extend_from_slice(message_id); - let mut hash = store::gxhash::gxhash32(&bytes, 791120); - - if self.emails.items.is_empty() { - return hash; - } - - // Naive pass, assume hash is unique - let mut threads_ids = RoaringBitmap::new(); - let mut is_unique_hash = true; - for item in self.emails.items.iter() { - if is_unique_hash && item.thread_id != hash { - is_unique_hash = false; - } - threads_ids.insert(item.thread_id); - } - - if is_unique_hash { - hash - } else { - for _ in 0..u32::MAX { - hash = hash.wrapping_add(1); - if !threads_ids.contains(hash) { - return hash; - } - } - hash - } - } -} - impl CacheSwap { pub fn new(value: Arc) -> Self { Self(Arc::new(ArcSwap::new(value))) diff --git a/crates/common/src/manager/boot.rs b/crates/common/src/manager/boot.rs index b0275bcd..12afaae6 100644 --- a/crates/common/src/manager/boot.rs +++ b/crates/common/src/manager/boot.rs @@ -5,6 +5,7 @@ */ use std::{ + collections::BTreeMap, net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::Arc, @@ -152,7 +153,6 @@ impl BootManager { config.new_build_error("*", format!("Could not read configuration file: {err}")); } } - let cfg_local = config.keys.clone(); // Resolve environment macros config.resolve_macros(&["env"]).await; @@ -168,12 +168,33 @@ impl BootManager { // Load stores let mut stores = Stores::parse(&mut config).await; + let local_patterns = Patterns::parse(&mut config); + + // Build local keys and warn about database keys defined in the local configuration + let mut cfg_local = BTreeMap::new(); + let mut warn_keys = Vec::new(); + for (key, value) in &config.keys { + if !local_patterns.is_local_key(key) { + warn_keys.push(key.clone()); + } + cfg_local.insert(key.clone(), value.clone()); + } + for warn_key in warn_keys { + config.new_build_warning( + warn_key, + concat!( + "Database key defined in local configuration, this might cause issues. ", + "See https://stalw.art/docs/configuration/overview/#loc", + "al-and-database-settings" + ), + ); + } // Build manager let manager = ConfigManager { cfg_local: ArcSwap::from_pointee(cfg_local), cfg_local_path, - cfg_local_patterns: Patterns::parse(&mut config).into(), + cfg_local_patterns: local_patterns.into(), cfg_store: config .value("storage.data") .and_then(|id| stores.stores.get(id)) @@ -183,10 +204,24 @@ impl BootManager { // Extend configuration with settings stored in the db if !manager.cfg_store.is_none() { - manager - .extend_config(&mut config, "") + for (key, value) in manager + .db_list("", false) .await - .failed("Failed to read configuration"); + .failed("Failed to read database configuration") + { + if manager.cfg_local_patterns.is_local_key(&key) { + config.new_build_warning( + &key, + concat!( + "Local key defined in database, this might cause issues. ", + "See https://stalw.art/docs/configuration/overview/#loc", + "al-and-database-settings" + ), + ); + } + + config.keys.entry(key).or_insert(value); + } } // Parse telemetry @@ -213,25 +248,9 @@ impl BootManager { ))); } - // Generate a Cluster encryption key if missing - if config - .value("cluster.key") - .filter(|v| !v.is_empty()) - .is_none() - { - insert_keys.push(ConfigKey::from(( - "cluster.key", - rng() - .sample_iter(Alphanumeric) - .take(64) - .map(char::from) - .collect::(), - ))); - } - // Download Spam filter rules if missing // TODO remove this check in 1.0 - let mut update_webadmin = match config.value("version.spam-filter").and_then(|v| { + let update_webadmin = match config.value("version.spam-filter").and_then(|v| { if !v.is_empty() { Some(Semver::try_from(v)) } else { @@ -248,7 +267,6 @@ impl BootManager { .await; let _ = manager.clear_prefix("sieve.trusted.scripts.greylist").await; let _ = manager.clear_prefix("sieve.trusted.scripts.train").await; - //let _ = manager.clear_prefix("session.data.script").await; let _ = manager.clear("version.spam-filter").await; match manager.fetch_spam_rules().await { @@ -310,18 +328,6 @@ impl BootManager { } }; - // TODO remove key migration in 1.0 - for (old_key, new_key) in [ - ("lookup.default.hostname", "server.hostname"), - ("lookup.default.domain", "report.domain"), - ] { - if let (Some(old_value), None) = (config.value(old_key), config.value(new_key)) - { - insert_keys.push(ConfigKey::from((new_key, old_value))); - update_webadmin = true; - } - } - // Download webadmin if missing if let Some(blob_store) = config .value("storage.blob") diff --git a/crates/common/src/manager/config.rs b/crates/common/src/manager/config.rs index 69a1545c..3187cb8e 100644 --- a/crates/common/src/manager/config.rs +++ b/crates/common/src/manager/config.rs @@ -133,7 +133,7 @@ impl ConfigManager { Ok(grouped) } - async fn db_list( + pub async fn db_list( &self, prefix: &str, strip_prefix: bool, @@ -530,7 +530,6 @@ impl Patterns { Pattern::Include(MatchType::StartsWith( "authentication.fallback-admin.".to_string(), )), - Pattern::Exclude(MatchType::Equal("cluster.key".to_string())), Pattern::Include(MatchType::StartsWith("cluster.".to_string())), Pattern::Include(MatchType::Equal("storage.data".to_string())), Pattern::Include(MatchType::Equal("storage.blob".to_string())), diff --git a/crates/email/src/lib.rs b/crates/email/src/lib.rs index b596ba28..48a38f16 100644 --- a/crates/email/src/lib.rs +++ b/crates/email/src/lib.rs @@ -8,7 +8,6 @@ pub mod cache; pub mod identity; pub mod mailbox; pub mod message; -pub mod migration; pub mod push; pub mod sieve; pub mod submission; diff --git a/crates/email/src/message/copy.rs b/crates/email/src/message/copy.rs index 24f58cbe..30f831fa 100644 --- a/crates/email/src/message/copy.rs +++ b/crates/email/src/message/copy.rs @@ -4,6 +4,12 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use super::{ + index::{MAX_ID_LENGTH, MAX_SORT_FIELD_LENGTH, TrimTextValue, VisitText}, + ingest::{EmailIngest, IngestedEmail, ThreadResult}, + metadata::{MessageData, MessageMetadata}, +}; +use crate::mailbox::UidMailbox; use common::{Server, auth::ResourceToken, storage::index::ObjectIndexBuilder}; use jmap_proto::{ error::set::SetError, @@ -23,14 +29,6 @@ use store::{ }; use trc::AddContext; -use crate::{cache::MessageCacheFetch, mailbox::UidMailbox}; - -use super::{ - index::{MAX_ID_LENGTH, MAX_SORT_FIELD_LENGTH, TrimTextValue, VisitText}, - ingest::{EmailIngest, IngestedEmail, ThreadResult}, - metadata::{MessageData, MessageMetadata}, -}; - pub trait EmailCopy: Sync + Send { #[allow(clippy::too_many_arguments)] fn copy_message( @@ -145,10 +143,10 @@ impl EmailCopy for Server { ThreadResult::Id(thread_id) => (false, thread_id), ThreadResult::Create => ( true, - self.get_cached_messages(account_id) + self.store() + .assign_document_ids(account_id, Collection::Thread, 1) .await - .caused_by(trc::location!())? - .assign_thread_id(subject.as_bytes(), message_id.as_bytes()), + .caused_by(trc::location!())?, ), ThreadResult::Skip => unreachable!(), }; diff --git a/crates/email/src/message/ingest.rs b/crates/email/src/message/ingest.rs index 2cb07a23..e3607dc0 100644 --- a/crates/email/src/message/ingest.rs +++ b/crates/email/src/message/ingest.rs @@ -330,13 +330,10 @@ impl EmailIngest for Server { ThreadResult::Id(thread_id) => thread_id, ThreadResult::Create => { log_thread_create = true; - self.get_cached_messages(account_id) + self.store() + .assign_document_ids(account_id, Collection::Thread, 1) .await .caused_by(trc::location!())? - .assign_thread_id( - subject.as_bytes(), - message_id.as_deref().unwrap_or_default().as_bytes(), - ) } ThreadResult::Skip => { // Duplicate message diff --git a/crates/email/src/message/metadata.rs b/crates/email/src/message/metadata.rs index ecd5e75d..609076ea 100644 --- a/crates/email/src/message/metadata.rs +++ b/crates/email/src/message/metadata.rs @@ -24,7 +24,7 @@ use std::{borrow::Cow, collections::VecDeque}; use store::{SERIALIZE_MESSAGE_DATA_V1, SERIALIZE_MESSAGE_METADATA_V1, SerializedVersion}; use utils::BlobHash; -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)] +#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Default)] pub struct MessageData { pub mailboxes: Vec, pub keywords: Vec, diff --git a/crates/email/src/migration/email.rs b/crates/email/src/migration/email.rs deleted file mode 100644 index ae03cee8..00000000 --- a/crates/email/src/migration/email.rs +++ /dev/null @@ -1,14 +0,0 @@ -use common::Server; -use jmap_proto::types::collection::Collection; - -async fn migrate_email(server: &Server, account_id: u32) -> trc::Result<()> { - // Obtain email ids - let document_ids = server - .get_document_ids(account_id, Collection::Email) - .await? - .unwrap_or_default(); - - //TODO remove tombstones - - Ok(()) -} diff --git a/crates/email/src/migration/mod.rs b/crates/email/src/migration/mod.rs deleted file mode 100644 index aa5f45d4..00000000 --- a/crates/email/src/migration/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod email; diff --git a/crates/email/src/sieve/ingest.rs b/crates/email/src/sieve/ingest.rs index c26500f3..ee147573 100644 --- a/crates/email/src/sieve/ingest.rs +++ b/crates/email/src/sieve/ingest.rs @@ -19,16 +19,17 @@ use common::{ use directory::{Permission, QueryBy}; use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, property::Property}; use mail_parser::MessageParser; -use rkyv::util::AlignedVec; use sieve::{Envelope, Event, Input, Mailbox, Recipient, Sieve}; use std::future::Future; use std::{borrow::Cow, sync::Arc}; use store::{ - Serialize, SerializeInfallible, + Deserialize, Serialize, SerializeInfallible, ahash::AHashMap, dispatch::lookup::KeyValue, query::Filter, - write::{Archiver, BatchBuilder, BlobOp}, + write::{ + AlignedBytes, Archiver, BatchBuilder, BlobOp, UnversionedArchive, UnversionedArchiver, + }, }; use trc::{AddContext, SieveEvent}; use utils::config::utils::ParseValue; @@ -636,9 +637,10 @@ impl SieveScriptIngest for Server { // Obtain the precompiled script if let Some(script) = script_bytes.get(script_offset..).and_then(|bytes| { - let mut abytes = AlignedVec::<16>::with_capacity(bytes.len()); - abytes.extend_from_slice(bytes); - rkyv::from_bytes::(&abytes).ok() + as Deserialize>::deserialize(bytes) + .ok()? + .deserialize::() + .ok() }) { Ok(CompiledScript { script, @@ -657,9 +659,8 @@ impl SieveScriptIngest for Server { ) { Ok(sieve) => { // Store updated compiled sieve script - let compiled_bytes = rkyv::to_bytes::(&sieve) - .map_err(Into::into) - .caused_by(trc::location!())?; + let sieve = UnversionedArchiver::new(sieve); + let compiled_bytes = sieve.serialize().caused_by(trc::location!())?; let mut updated_sieve_bytes = Vec::with_capacity(script_offset + compiled_bytes.len()); updated_sieve_bytes.extend_from_slice(&script_bytes[0..script_offset]); @@ -700,7 +701,7 @@ impl SieveScriptIngest for Server { .caused_by(trc::location!())?; Ok(CompiledScript { - script: sieve, + script: sieve.into_inner(), name: new_archive.into_inner().name, hash, }) diff --git a/crates/groupware/src/cache/mod.rs b/crates/groupware/src/cache/mod.rs index 46970101..e4bb68e8 100644 --- a/crates/groupware/src/cache/mod.rs +++ b/crates/groupware/src/cache/mod.rs @@ -289,7 +289,7 @@ impl GroupwareCache for Server { let mut batch = BatchBuilder::new(); let document_id = self .store() - .assign_document_ids(account_id, Collection::Calendar, 1) + .assign_document_ids(account_id, Collection::Calendar, 3) .await?; Calendar { name: name.clone(), diff --git a/crates/jmap-proto/src/types/value.rs b/crates/jmap-proto/src/types/value.rs index 44726d10..404dad01 100644 --- a/crates/jmap-proto/src/types/value.rs +++ b/crates/jmap-proto/src/types/value.rs @@ -298,6 +298,7 @@ impl Value { pub fn as_uint(&self) -> Option { match self { Value::UnsignedInt(u) => Some(*u), + Value::Id(id) => Some(*id.as_ref()), _ => None, } } diff --git a/crates/jmap/src/sieve/set.rs b/crates/jmap/src/sieve/set.rs index 72bbdb7c..b722a4ed 100644 --- a/crates/jmap/src/sieve/set.rs +++ b/crates/jmap/src/sieve/set.rs @@ -32,10 +32,10 @@ use jmap_proto::{ use rand::distr::Alphanumeric; use sieve::compiler::ErrorType; use store::{ - BlobClass, + BlobClass, Serialize, query::Filter, rand::{Rng, rng}, - write::{Archive, BatchBuilder}, + write::{Archive, BatchBuilder, UnversionedArchiver}, }; use trc::AddContext; @@ -468,7 +468,7 @@ impl SieveScriptSet for Server { match self.core.sieve.untrusted_compiler.compile(&bytes) { Ok(script) => { changes.size = bytes.len() as u32; - bytes.extend(rkyv::to_bytes::(&script)?.iter()); + bytes.extend(UnversionedArchiver::new(script).serialize().caused_by(trc::location!())?); bytes.into() } Err(err) => { diff --git a/crates/jmap/src/vacation/set.rs b/crates/jmap/src/vacation/set.rs index 96e4b775..13b80188 100644 --- a/crates/jmap/src/vacation/set.rs +++ b/crates/jmap/src/vacation/set.rs @@ -27,7 +27,10 @@ use jmap_proto::{ use mail_builder::MessageBuilder; use mail_parser::decoders::html::html_to_text; use std::future::Future; -use store::write::BatchBuilder; +use store::{ + Serialize, + write::{BatchBuilder, UnversionedArchiver}, +}; use trc::AddContext; pub trait VacationResponseSet: Sync + Send { @@ -436,7 +439,11 @@ impl VacationResponseSet for Server { obj.size = script.len() as u32; // Serialize script - script.extend(rkyv::to_bytes::(&compiled_script)?.iter()); + script.extend( + UnversionedArchiver::new(compiled_script) + .serialize() + .caused_by(trc::location!())?, + ); Ok(script) } diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 0af9edcb..5fee2232 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -33,6 +33,7 @@ groupware = { path = "../groupware" } services = { path = "../services" } trc = { path = "../trc" } utils = { path = "../utils" } +migration = { path = "../migration" } tokio = { version = "1.23", features = ["full"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/crates/main/src/main.rs b/crates/main/src/main.rs index 870d59e0..69562187 100644 --- a/crates/main/src/main.rs +++ b/crates/main/src/main.rs @@ -6,8 +6,6 @@ #![warn(clippy::large_futures)] -use std::time::Duration; - use common::{config::server::ServerProtocol, core::BuildServer, manager::boot::BootManager}; use http::HttpSessionManager; use imap::core::ImapSessionManager; @@ -15,6 +13,7 @@ use managesieve::core::ManageSieveSessionManager; use pop3::Pop3SessionManager; use services::{StartServices, broadcast::subscriber::spawn_broadcast_subscriber}; use smtp::{StartQueueManager, core::SmtpSessionManager}; +use std::time::Duration; use trc::Collector; use utils::wait_for_shutdown; @@ -30,6 +29,16 @@ async fn main() -> std::io::Result<()> { // Load config and apply macros let mut init = Box::pin(BootManager::init()).await; + // Migrate database + if let Err(err) = migration::try_migrate(&init.inner.build_server()).await { + trc::event!( + Server(trc::ServerEvent::StartupError), + Details = "Failed to migrate database, aborting startup.", + Reason = err, + ); + return Ok(()); + } + // Init services init.start_services().await; init.start_queue_manager(); @@ -38,13 +47,9 @@ async fn main() -> std::io::Result<()> { init.config.log_errors(); init.config.log_warnings(); - { - let server = init.inner.build_server(); - - // Log licensing information - #[cfg(feature = "enterprise")] - server.log_license_details(); - } + // Log licensing information + #[cfg(feature = "enterprise")] + init.inner.build_server().log_license_details(); // Spawn servers let (shutdown_tx, shutdown_rx) = init.servers.spawn(|server, acceptor, shutdown_rx| { diff --git a/crates/managesieve/src/op/putscript.rs b/crates/managesieve/src/op/putscript.rs index a80beb1d..846949ac 100644 --- a/crates/managesieve/src/op/putscript.rs +++ b/crates/managesieve/src/op/putscript.rs @@ -12,7 +12,11 @@ use imap_proto::receiver::Request; use jmap_proto::types::{collection::Collection, property::Property}; use sieve::compiler::ErrorType; use std::time::Instant; -use store::{query::Filter, write::BatchBuilder}; +use store::{ + Serialize, + query::Filter, + write::{BatchBuilder, UnversionedArchiver}, +}; use trc::AddContext; impl Session { @@ -75,10 +79,9 @@ impl Session { { Ok(compiled_script) => { script_bytes.extend( - rkyv::to_bytes::(&compiled_script) - .map_err(Into::into) - .caused_by(trc::location!())? - .iter(), + UnversionedArchiver::new(compiled_script) + .serialize() + .caused_by(trc::location!())?, ); } Err(err) => { diff --git a/crates/migration/Cargo.toml b/crates/migration/Cargo.toml new file mode 100644 index 00000000..85c6f941 --- /dev/null +++ b/crates/migration/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "migration" +version = "0.11.7" +edition = "2024" +resolver = "2" + +[dependencies] +utils = { path = "../utils" } +nlp = { path = "../nlp" } +store = { path = "../store" } +trc = { path = "../trc" } +jmap_proto = { path = "../jmap-proto" } +common = { path = "../common" } +email = { path = "../email" } +directory = { path = "../directory" } +smtp = { path = "../smtp" } +mail-parser = { version = "0.11", features = ["full_encoding"] } +mail-auth = { version = "0.7", features = ["rkyv"] } +sieve-rs = { version = "0.7", features = ["rkyv"] } +tokio = { version = "1.23", features = ["net", "macros"] } +serde = { version = "1.0", features = ["derive"]} +serde_json = "1.0" +rkyv = { version = "0.8.10", features = ["little_endian"] } +compact_str = "0.9.0" +bincode = "1.3.3" +lz4_flex = { version = "0.11", default-features = false } +base64 = "0.22" + +[features] +test_mode = [] +enterprise = [] + +[dev-dependencies] +tokio = { version = "1.23", features = ["full"] } diff --git a/crates/migration/src/changelog.rs b/crates/migration/src/changelog.rs new file mode 100644 index 00000000..b9670c1b --- /dev/null +++ b/crates/migration/src/changelog.rs @@ -0,0 +1,32 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use store::{ + SUBSPACE_LOGS, U64_LEN, + write::{AnyKey, key::KeySerializer}, +}; +use trc::AddContext; + +pub(crate) async fn reset_changelog(server: &Server) -> trc::Result<()> { + // Delete changes + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_LOGS, + key: KeySerializer::new(U64_LEN).write(0u8).finalize(), + }, + AnyKey { + subspace: SUBSPACE_LOGS, + key: KeySerializer::new(U64_LEN) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!()) +} diff --git a/crates/migration/src/email.rs b/crates/migration/src/email.rs new file mode 100644 index 00000000..1fc0895a --- /dev/null +++ b/crates/migration/src/email.rs @@ -0,0 +1,574 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::{LegacyBincode, get_properties}; +use common::Server; +use email::{ + mailbox::{TOMBSTONE_ID, UidMailbox}, + message::{ + index::{MAX_ID_LENGTH, VisitText}, + metadata::{ + MessageData, MessageMetadata, MessageMetadataContents, MessageMetadataPart, + MetadataPartType, + }, + }, +}; +use jmap_proto::types::{collection::Collection, keyword::*, property::Property}; +use mail_parser::{ + Address, Attribute, ContentType, DateTime, Encoding, Header, HeaderName, HeaderValue, Received, +}; +use std::{borrow::Cow, collections::VecDeque}; +use store::{ + BitmapKey, Deserialize, SUBSPACE_BITMAP_TAG, SUBSPACE_INDEXES, SUBSPACE_PROPERTY, Serialize, + U64_LEN, ValueKey, + ahash::AHashMap, + write::{ + AlignedBytes, AnyKey, Archive, Archiver, BatchBuilder, BitmapClass, TagValue, ValueClass, + key::KeySerializer, + }, +}; +use trc::AddContext; +use utils::{BlobHash, codec::leb128::Leb128Iterator}; + +const BM_MARKER: u8 = 1 << 7; + +pub(crate) async fn migrate_emails(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let mut message_ids = server + .get_document_ids(account_id, Collection::Email) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_emails = message_ids.len(); + if num_emails == 0 { + return Ok(0); + } + let tombstoned_ids = server + .store() + .get_bitmap(BitmapKey { + account_id, + collection: Collection::Email.into(), + class: BitmapClass::Tag { + field: Property::MailboxIds.into(), + value: TagValue::Id(TOMBSTONE_ID), + }, + document_id: 0, + }) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + let mut message_data: AHashMap = AHashMap::with_capacity(num_emails as usize); + let mut did_migrate = false; + + // Obtain mailboxes + for (message_id, uid_mailbox) in get_properties::( + server, + account_id, + Collection::Email, + &(), + Property::MailboxIds, + ) + .await + .caused_by(trc::location!())? + { + message_data.entry(message_id).or_default().mailboxes = uid_mailbox.0; + } + + // Obtain keywords + for (message_id, keywords) in get_properties::( + server, + account_id, + Collection::Email, + &(), + Property::Keywords, + ) + .await + .caused_by(trc::location!())? + { + message_data.entry(message_id).or_default().keywords = keywords.0; + } + + // Obtain threadIds + for (message_id, thread_id) in get_properties::( + server, + account_id, + Collection::Email, + &(), + Property::ThreadId, + ) + .await + .caused_by(trc::location!())? + { + message_data.entry(message_id).or_default().thread_id = thread_id; + } + + // Obtain changeIds + /*for (message_id, change_id) in + get_properties::(server, account_id, Collection::Email, &(), Property::Cid) + .await.caused_by(trc::location!())? + { + message_data.entry(message_id).or_default().change_id = change_id; + }*/ + + // Write message data + for (message_id, data) in message_data { + if !tombstoned_ids.contains(message_id) { + message_ids.insert(message_id); + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email) + .update_document(message_id); + + for mailbox in &data.mailboxes { + batch.untag(Property::MailboxIds, TagValue::Id(mailbox.mailbox_id)); + } + + did_migrate = true; + + batch.set( + Property::Value, + Archiver::new(data) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + } + + // Migrate message metadata + for message_id in message_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Email.into(), + document_id: message_id, + class: ValueClass::Property(Property::BodyStructure.into()), + }) + .await + { + Ok(Some(legacy_metadata)) => { + let metadata = MessageMetadata::from_legacy(legacy_metadata.inner); + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Email) + .update_document(message_id); + + for header in metadata.root_part().headers.iter().rev() { + if matches!(header.name, HeaderName::MessageId) { + header.value.visit_text(|id| { + if id.len() < MAX_ID_LENGTH { + batch.index(Property::References, encode_message_id(id)); + } + }); + } + } + + batch.set( + Property::BodyStructure, + Archiver::new(metadata) + .serialize() + .caused_by(trc::location!())?, + ); + + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Email.into(), + document_id: message_id, + class: ValueClass::Property(Property::BodyStructure.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(message_id) + .caused_by(trc::location!())); + } + } + } + } + + // Delete keyword bitmaps + for field in [ + u8::from(Property::Keywords), + u8::from(Property::Keywords) | BM_MARKER, + ] { + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_BITMAP_TAG, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(field) + .finalize(), + }, + AnyKey { + subspace: SUBSPACE_BITMAP_TAG, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(field) + .write(&[u8::MAX; 8][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + } + + // Delete messageId index, now in References + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_INDEXES, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(u8::from(Property::MessageId)) + .finalize(), + }, + AnyKey { + subspace: SUBSPACE_INDEXES, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(u8::from(Property::MessageId)) + .write(&[u8::MAX; 8][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + + // Delete values + for property in [ + Property::MailboxIds, + Property::Keywords, + Property::ThreadId, + Property::Cid, + ] { + let property: u8 = property.into(); + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_PROPERTY, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(property) + .finalize(), + }, + AnyKey { + subspace: SUBSPACE_PROPERTY, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Email)) + .write(property) + .write(&[u8::MAX; 8][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + } + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids(account_id, Collection::Email, num_emails + 1) + .await + .caused_by(trc::location!())?; + Ok(num_emails) + } else { + Ok(0) + } +} + +pub trait FromLegacy { + fn from_legacy(legacy: LegacyMessageMetadata<'_>) -> Self; +} + +impl FromLegacy for MessageMetadata { + fn from_legacy(legacy: LegacyMessageMetadata<'_>) -> Self { + let mut metadata = MessageMetadata { + contents: vec![], + blob_hash: legacy.blob_hash, + size: legacy.size as u32, + received_at: legacy.received_at, + preview: legacy.preview, + has_attachments: legacy.has_attachments, + raw_headers: legacy.raw_headers, + }; + + let mut messages = VecDeque::from([legacy.contents]); + let mut message_id = 0; + + while let Some(message) = messages.pop_front() { + let mut contents = MessageMetadataContents { + html_body: message.html_body.into_iter().map(|c| c as u16).collect(), + text_body: message.text_body.into_iter().map(|c| c as u16).collect(), + attachments: message.attachments.into_iter().map(|c| c as u16).collect(), + parts: Vec::with_capacity(message.parts.len()), + }; + + for part in message.parts { + let body = match part.body { + LegacyMetadataPartType::Text => MetadataPartType::Text, + LegacyMetadataPartType::Html => MetadataPartType::Html, + LegacyMetadataPartType::Binary => MetadataPartType::Binary, + LegacyMetadataPartType::InlineBinary => MetadataPartType::InlineBinary, + LegacyMetadataPartType::Message(message) => { + messages.push_back(message); + message_id += 1; + MetadataPartType::Message(message_id) + } + LegacyMetadataPartType::Multipart(parts) => { + MetadataPartType::Multipart(parts.into_iter().map(|p| p as u16).collect()) + } + }; + + contents.parts.push(MessageMetadataPart { + headers: part + .headers + .into_iter() + .map(|hdr| Header { + name: hdr.name.into_owned(), + value: hdr.value.into(), + offset_field: hdr.offset_field as u32, + offset_start: hdr.offset_start as u32, + offset_end: hdr.offset_end as u32, + }) + .collect(), + is_encoding_problem: part.is_encoding_problem, + encoding: part.encoding, + body, + size: part.size as u32, + offset_header: part.offset_header as u32, + offset_body: part.offset_body as u32, + offset_end: part.offset_end as u32, + }); + } + metadata.contents.push(contents); + } + + metadata + } +} + +pub struct Mailboxes(Vec); +pub struct Keywords(Vec); + +impl Deserialize for Mailboxes { + fn deserialize(bytes: &[u8]) -> trc::Result { + let mut bytes = bytes.iter(); + let len: usize = bytes + .next_leb128() + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?; + let mut list = Vec::with_capacity(len); + for _ in 0..len { + list.push(UidMailbox { + mailbox_id: bytes + .next_leb128() + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?, + uid: bytes + .next_leb128() + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?, + }); + } + Ok(Mailboxes(list)) + } +} + +impl Deserialize for Keywords { + fn deserialize(bytes: &[u8]) -> trc::Result { + let mut bytes = bytes.iter(); + let len: usize = bytes + .next_leb128() + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?; + let mut list = Vec::with_capacity(len); + for _ in 0..len { + list.push( + deserialize_keyword(&mut bytes) + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?, + ); + } + Ok(Keywords(list)) + } +} + +fn deserialize_keyword(bytes: &mut std::slice::Iter<'_, u8>) -> Option { + match bytes.next_leb128::()? { + SEEN => Some(Keyword::Seen), + DRAFT => Some(Keyword::Draft), + FLAGGED => Some(Keyword::Flagged), + ANSWERED => Some(Keyword::Answered), + RECENT => Some(Keyword::Recent), + IMPORTANT => Some(Keyword::Important), + PHISHING => Some(Keyword::Phishing), + JUNK => Some(Keyword::Junk), + NOTJUNK => Some(Keyword::NotJunk), + DELETED => Some(Keyword::Deleted), + FORWARDED => Some(Keyword::Forwarded), + MDN_SENT => Some(Keyword::MdnSent), + other => { + let len = other - OTHER; + let mut keyword = Vec::with_capacity(len); + for _ in 0..len { + keyword.push(*bytes.next()?); + } + Some(Keyword::Other(String::from_utf8(keyword).ok()?)) + } + } +} + +pub type LegacyMessagePartId = usize; +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LegacyMessageMetadata<'x> { + pub contents: LegacyMessageMetadataContents<'x>, + pub blob_hash: BlobHash, + pub size: usize, + pub received_at: u64, + pub preview: String, + pub has_attachments: bool, + pub raw_headers: Vec, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LegacyMessageMetadataContents<'x> { + pub html_body: Vec, + pub text_body: Vec, + pub attachments: Vec, + pub parts: Vec>, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LegacyMessageMetadataPart<'x> { + pub headers: Vec>, + pub is_encoding_problem: bool, + pub body: LegacyMetadataPartType<'x>, + pub encoding: Encoding, + pub size: usize, + pub offset_header: usize, + pub offset_body: usize, + pub offset_end: usize, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LegacyHeader<'x> { + pub name: HeaderName<'x>, + pub value: LegacyHeaderValue<'x>, + pub offset_field: usize, + pub offset_start: usize, + pub offset_end: usize, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize, Default)] +pub enum LegacyHeaderValue<'x> { + /// Address list or group + Address(Address<'x>), + + /// String + Text(Cow<'x, str>), + + /// List of strings + TextList(Vec>), + + /// Datetime + DateTime(DateTime), + + /// Content-Type or Content-Disposition header + ContentType(LegacyContentType<'x>), + + /// Received header + Received(Box>), + + #[default] + Empty, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct LegacyContentType<'x> { + pub c_type: Cow<'x, str>, + pub c_subtype: Option>, + pub attributes: Option, Cow<'x, str>)>>, +} + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub enum LegacyMetadataPartType<'x> { + Text, + Html, + Binary, + InlineBinary, + Message(LegacyMessageMetadataContents<'x>), + Multipart(Vec), +} + +impl From> for HeaderValue<'static> { + fn from(value: LegacyHeaderValue<'_>) -> Self { + match value { + LegacyHeaderValue::Address(address) => HeaderValue::Address(address.into_owned()), + LegacyHeaderValue::Text(cow) => HeaderValue::Text(cow.into_owned().into()), + LegacyHeaderValue::TextList(cows) => HeaderValue::TextList( + cows.into_iter() + .map(|cow| cow.into_owned().into()) + .collect(), + ), + LegacyHeaderValue::DateTime(date_time) => HeaderValue::DateTime(date_time), + LegacyHeaderValue::ContentType(legacy_content_type) => { + HeaderValue::ContentType(ContentType { + c_type: legacy_content_type.c_type.into_owned().into(), + c_subtype: legacy_content_type.c_subtype.map(|s| s.into_owned().into()), + attributes: legacy_content_type.attributes.map(|attrs| { + attrs + .into_iter() + .map(|(k, v)| Attribute { + name: k.into_owned().into(), + value: v.into_owned().into(), + }) + .collect() + }), + }) + } + LegacyHeaderValue::Received(received) => { + HeaderValue::Received(Box::new(received.into_owned())) + } + LegacyHeaderValue::Empty => HeaderValue::Empty, + } + } +} + +pub(crate) fn encode_message_id(message_id: &str) -> Vec { + let mut msg_id = Vec::with_capacity(message_id.len() + 1); + msg_id.extend_from_slice(message_id.as_bytes()); + msg_id.push(0); + msg_id +} diff --git a/crates/migration/src/encryption.rs b/crates/migration/src/encryption.rs new file mode 100644 index 00000000..33665f86 --- /dev/null +++ b/crates/migration/src/encryption.rs @@ -0,0 +1,92 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use email::message::crypto::EncryptionParams; +use jmap_proto::types::{collection::Collection, property::Property}; +use store::{ + Deserialize, Serialize, ValueKey, + write::{AlignedBytes, Archive, Archiver, BatchBuilder, ValueClass}, +}; +use trc::AddContext; + +pub(crate) async fn migrate_encryption_params( + server: &Server, + account_id: u32, +) -> trc::Result { + match server + .store() + .get_value::(ValueKey { + account_id, + collection: Collection::Principal.into(), + document_id: 0, + class: ValueClass::Property(Property::Parameters.into()), + }) + .await + { + Ok(Some(legacy)) => { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Principal) + .update_document(0) + .set( + Property::Parameters, + Archiver::new(legacy.0) + .serialize() + .caused_by(trc::location!())?, + ); + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + return Ok(1); + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Principal.into(), + document_id: 0, + class: ValueClass::Property(Property::Parameters.into()), + }) + .await + .is_err() + { + return Err(err.account_id(account_id).caused_by(trc::location!())); + } + } + } + Ok(0) +} + +struct LegacyEncryptionParams(EncryptionParams); + +impl Deserialize for LegacyEncryptionParams { + fn deserialize(bytes: &[u8]) -> trc::Result { + let version = *bytes + .first() + .ok_or_else(|| trc::StoreEvent::DataCorruption.caused_by(trc::location!()))?; + match version { + 1 if bytes.len() > 1 => bincode::deserialize(&bytes[1..]) + .map(LegacyEncryptionParams) + .map_err(|err| { + trc::EventType::Store(trc::StoreEvent::DeserializeError) + .reason(err) + .caused_by(trc::location!()) + }), + + _ => Err(trc::StoreEvent::DeserializeError + .into_err() + .caused_by(trc::location!()) + .ctx(trc::Key::Value, version as u64)), + } + } +} diff --git a/crates/migration/src/identity.rs b/crates/migration/src/identity.rs new file mode 100644 index 00000000..8cab5e3c --- /dev/null +++ b/crates/migration/src/identity.rs @@ -0,0 +1,158 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::object::Object; +use crate::object::FromLegacy; +use common::Server; +use email::identity::{EmailAddress, Identity}; +use jmap_proto::types::{collection::Collection, property::Property, value::Value}; +use store::{ + Serialize, ValueKey, + write::{AlignedBytes, Archive, Archiver, BatchBuilder, ValueClass}, +}; +use trc::AddContext; + +pub(crate) async fn migrate_identities(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let identity_ids = server + .get_document_ids(account_id, Collection::Identity) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_identities = identity_ids.len(); + if num_identities == 0 { + return Ok(0); + } + let mut did_migrate = false; + + for identity_id in identity_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Identity.into(), + document_id: identity_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + { + Ok(Some(legacy)) => { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Identity) + .update_document(identity_id) + .set( + Property::Value, + Archiver::new(Identity::from_legacy(legacy)) + .serialize() + .caused_by(trc::location!())?, + ); + + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Identity.into(), + document_id: identity_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(identity_id) + .caused_by(trc::location!())); + } + } + } + } + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids(account_id, Collection::Identity, num_identities + 1) + .await + .caused_by(trc::location!())?; + Ok(num_identities) + } else { + Ok(0) + } +} + +impl FromLegacy for Identity { + fn from_legacy(legacy: Object) -> Self { + Identity { + name: legacy + .get(&Property::Name) + .as_string() + .unwrap_or_default() + .to_string(), + email: legacy + .get(&Property::Email) + .as_string() + .unwrap_or_default() + .to_string(), + reply_to: convert_email_addresses(legacy.get(&Property::ReplyTo)), + bcc: convert_email_addresses(legacy.get(&Property::Bcc)), + text_signature: legacy + .get(&Property::TextSignature) + .as_string() + .unwrap_or_default() + .to_string(), + html_signature: legacy + .get(&Property::HtmlSignature) + .as_string() + .unwrap_or_default() + .to_string(), + } + } +} + +fn convert_email_addresses(value: &Value) -> Option> { + if let Value::List(value) = value { + let mut addrs = Vec::with_capacity(value.len()); + for addr in value { + if let Value::Object(obj) = addr { + let mut addr = EmailAddress { + name: None, + email: String::new(), + }; + for (key, value) in &obj.0 { + match (key, value) { + (Property::Email, Value::Text(value)) => { + addr.email = value.to_string(); + } + (Property::Name, Value::Text(value)) => { + addr.name = Some(value.to_string()); + } + _ => { + break; + } + } + } + if !addr.email.is_empty() { + addrs.push(addr); + } + } + } + if !addrs.is_empty() { Some(addrs) } else { None } + } else { + None + } +} diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs new file mode 100644 index 00000000..6cf55719 --- /dev/null +++ b/crates/migration/src/lib.rs @@ -0,0 +1,347 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use changelog::reset_changelog; +use common::{DATABASE_SCHEMA_VERSION, KV_LOCK_HOUSEKEEPER, Server}; +use jmap_proto::types::{collection::Collection, property::Property}; +use principal::{migrate_principal, migrate_principals}; +use queue::migrate_queue; +use report::migrate_reports; +use store::{ + Deserialize, IterateParams, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_REPORT_IN, + SUBSPACE_REPORT_OUT, SerializeInfallible, U32_LEN, Value, ValueKey, + dispatch::{DocumentSet, lookup::KeyValue}, + rand::{self, seq::SliceRandom}, + write::{AnyClass, AnyKey, BatchBuilder, ValueClass, key::DeserializeBigEndian}, +}; +use trc::AddContext; + +pub mod changelog; +pub mod email; +pub mod encryption; +pub mod identity; +pub mod mailbox; +pub mod object; +pub mod principal; +pub mod push; +pub mod queue; +pub mod report; +pub mod sieve; +pub mod submission; +pub mod threads; + +const LOCK_WAIT_TIME: u64 = 60; + +pub async fn try_migrate(server: &Server) -> trc::Result<()> { + if server + .store() + .get_value::(AnyKey { + subspace: SUBSPACE_PROPERTY, + key: vec![0u8], + }) + .await + .caused_by(trc::location!())? + == Some(DATABASE_SCHEMA_VERSION) + { + return Ok(()); + } + + if !is_new_install(server).await.caused_by(trc::location!())? { + let force_lock = std::env::var("FORCE_LOCK").is_ok(); + let in_memory = server.in_memory_store(); + let principal_ids; + + loop { + if force_lock + || in_memory + .try_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock", LOCK_WAIT_TIME) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_done", + )) + .await + .caused_by(trc::location!())? + .is_none() + { + migrate_queue(server).await.caused_by(trc::location!())?; + migrate_reports(server).await.caused_by(trc::location!())?; + reset_changelog(server).await.caused_by(trc::location!())?; + principal_ids = migrate_principals(server) + .await + .caused_by(trc::location!())?; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_done", + ), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } else { + principal_ids = server + .get_document_ids(u32::MAX, Collection::Principal) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration completed by another node.",) + ); + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") + .await + .caused_by(trc::location!())?; + break; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration lock busy, waiting 60 seconds.",) + ); + + tokio::time::sleep(std::time::Duration::from_secs(LOCK_WAIT_TIME)).await; + } + } + + if !principal_ids.is_empty() { + let mut principal_ids = principal_ids.into_iter().collect::>(); + principal_ids.shuffle(&mut rand::rng()); + + loop { + let mut skipped_principal_ids = Vec::new(); + let mut num_migrated = 0; + + for principal_id in principal_ids { + let lock_key = format!("migrate_{principal_id}_lock"); + let done_key = format!("migrate_{principal_id}_done"); + + if force_lock + || in_memory + .try_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes(), LOCK_WAIT_TIME) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + )) + .await + .caused_by(trc::location!())? + .is_none() + { + migrate_principal(server, principal_id) + .await + .caused_by(trc::location!())?; + + num_migrated += 1; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + ), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes()) + .await + .caused_by(trc::location!())?; + } else { + skipped_principal_ids.push(principal_id); + } + } + + if !skipped_principal_ids.is_empty() { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!( + "Migrated {num_migrated} accounts and {} are locked by another node, waiting 60 seconds.", + skipped_principal_ids.len() + ) + ); + tokio::time::sleep(std::time::Duration::from_secs(LOCK_WAIT_TIME)).await; + principal_ids = skipped_principal_ids; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Account migration completed.",) + ); + break; + } + } + } + } + + let mut batch = BatchBuilder::new(); + batch.set( + ValueClass::Any(AnyClass { + subspace: SUBSPACE_PROPERTY, + key: vec![0u8], + }), + DATABASE_SCHEMA_VERSION.serialize(), + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + + Ok(()) +} + +async fn is_new_install(server: &Server) -> trc::Result { + for subspace in [ + SUBSPACE_QUEUE_MESSAGE, + SUBSPACE_REPORT_IN, + SUBSPACE_REPORT_OUT, + SUBSPACE_PROPERTY, + ] { + let mut has_data = false; + + server + .store() + .iterate( + IterateParams::new( + AnyKey { + subspace, + key: vec![0u8], + }, + AnyKey { + subspace, + key: vec![u8::MAX; 16], + }, + ) + .no_values(), + |_, _| { + has_data = true; + + Ok(false) + }, + ) + .await + .caused_by(trc::location!())?; + + if has_data { + return Ok(false); + } + } + + Ok(true) +} + +async fn get_properties( + server: &Server, + account_id: u32, + collection: Collection, + iterate: &I, + property: P, +) -> trc::Result> +where + I: DocumentSet + Send + Sync, + P: AsRef + Sync + Send, + U: Deserialize + 'static, +{ + let property: u8 = property.as_ref().into(); + let collection: u8 = collection.into(); + let expected_results = iterate.len(); + let mut results = Vec::with_capacity(expected_results); + + server + .core + .storage + .data + .iterate( + IterateParams::new( + ValueKey { + account_id, + collection, + document_id: iterate.min(), + class: ValueClass::Property(property), + }, + ValueKey { + account_id, + collection, + document_id: iterate.max(), + class: ValueClass::Property(property), + }, + ), + |key, value| { + let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?; + if iterate.contains(document_id) { + results.push((document_id, U::deserialize(value)?)); + Ok(expected_results == 0 || results.len() < expected_results) + } else { + Ok(true) + } + }, + ) + .await + .add_context(|err| { + err.caused_by(trc::location!()) + .account_id(account_id) + .collection(collection) + .id(property.to_string()) + }) + .map(|_| results) +} + +pub struct LegacyBincode { + pub inner: T, +} + +impl LegacyBincode { + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl From> for LegacyBincode { + fn from(_: Value<'static>) -> Self { + unreachable!("From Value called on LegacyBincode") + } +} + +impl Deserialize for LegacyBincode { + fn deserialize(bytes: &[u8]) -> trc::Result { + lz4_flex::decompress_size_prepended(bytes) + .map_err(|err| { + trc::StoreEvent::DecompressError + .ctx(trc::Key::Value, bytes) + .caused_by(trc::location!()) + .reason(err) + }) + .and_then(|result| { + bincode::deserialize(&result).map_err(|err| { + trc::StoreEvent::DataCorruption + .ctx(trc::Key::Value, bytes) + .caused_by(trc::location!()) + .reason(err) + }) + }) + .map(|inner| Self { inner }) + } +} diff --git a/crates/migration/src/mailbox.rs b/crates/migration/src/mailbox.rs new file mode 100644 index 00000000..5065a3a7 --- /dev/null +++ b/crates/migration/src/mailbox.rs @@ -0,0 +1,161 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::object::Object; +use crate::object::FromLegacy; +use common::{Server, config::jmap::settings::SpecialUse}; +use email::mailbox::Mailbox; +use jmap_proto::types::{collection::Collection, property::Property, value::Value}; +use store::{ + SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, Serialize, U64_LEN, ValueKey, + rand, + write::{ + AlignedBytes, AnyKey, Archive, Archiver, BatchBuilder, ValueClass, key::KeySerializer, + }, +}; +use trc::AddContext; +use utils::config::utils::ParseValue; + +pub(crate) async fn migrate_mailboxes(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let mailbox_ids = server + .get_document_ids(account_id, Collection::Mailbox) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_mailboxes = mailbox_ids.len(); + if num_mailboxes == 0 { + return Ok(0); + } + let mut did_migrate = false; + + for mailbox_id in mailbox_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Mailbox.into(), + document_id: mailbox_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + { + Ok(Some(legacy)) => { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Mailbox) + .update_document(mailbox_id) + .set( + Property::Value, + Archiver::new(Mailbox::from_legacy(legacy)) + .serialize() + .caused_by(trc::location!())?, + ); + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::Mailbox.into(), + document_id: mailbox_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(mailbox_id) + .caused_by(trc::location!())); + } + } + } + } + + // Delete indexes + for subspace in [SUBSPACE_INDEXES, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT] { + server + .store() + .delete_range( + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Mailbox)) + .finalize(), + }, + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Mailbox)) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + } + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids(account_id, Collection::Mailbox, num_mailboxes + 1) + .await + .caused_by(trc::location!())?; + Ok(num_mailboxes) + } else { + Ok(0) + } +} + +impl FromLegacy for Mailbox { + fn from_legacy(legacy: Object) -> Self { + Mailbox { + name: legacy + .get(&Property::Name) + .as_string() + .unwrap_or_default() + .to_string(), + role: legacy + .get(&Property::Role) + .as_string() + .and_then(|r| SpecialUse::parse_value(r).ok()) + .unwrap_or(SpecialUse::None), + parent_id: legacy + .get(&Property::ParentId) + .as_uint() + .unwrap_or_default() as u32, + sort_order: legacy.get(&Property::SortOrder).as_uint().map(|s| s as u32), + uid_validity: rand::random(), + subscribers: legacy + .get(&Property::IsSubscribed) + .as_list() + .map(|s| s.as_slice()) + .unwrap_or_default() + .iter() + .filter_map(|s| s.as_uint()) + .map(|s| s as u32) + .collect(), + acls: legacy + .get(&Property::Acl) + .as_acl() + .cloned() + .unwrap_or_default(), + } + } +} diff --git a/crates/migration/src/object.rs b/crates/migration/src/object.rs new file mode 100644 index 00000000..88039db0 --- /dev/null +++ b/crates/migration/src/object.rs @@ -0,0 +1,336 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use jmap_proto::types::{ + blob::BlobId, + date::UTCDate, + id::Id, + keyword::*, + property::{HeaderForm, HeaderProperty, Property}, + value::{AclGrant, Value}, +}; +use std::slice::Iter; +use store::{Deserialize, U64_LEN}; +use utils::{ + codec::leb128::Leb128Iterator, + map::{bitmap::Bitmap, vec_map::VecMap}, +}; + +#[derive(Debug, Clone, Default, serde::Serialize, PartialEq, Eq)] +#[serde(transparent)] +pub struct Object { + pub properties: VecMap, +} + +impl Object { + pub fn with_capacity(capacity: usize) -> Self { + Self { + properties: VecMap::with_capacity(capacity), + } + } + + pub fn set(&mut self, property: Property, value: impl Into) -> bool { + self.properties.set(property, value.into()) + } + + pub fn append(&mut self, property: Property, value: impl Into) { + self.properties.append(property, value.into()); + } + + pub fn with_property(mut self, property: Property, value: impl Into) -> Self { + self.properties.append(property, value.into()); + self + } + + pub fn remove(&mut self, property: &Property) -> Value { + self.properties.remove(property).unwrap_or(Value::Null) + } + + pub fn get(&self, property: &Property) -> &Value { + self.properties.get(property).unwrap_or(&Value::Null) + } +} + +const TEXT: u8 = 0; +const UNSIGNED_INT: u8 = 1; +const BOOL_TRUE: u8 = 2; +const BOOL_FALSE: u8 = 3; +const ID: u8 = 4; +const DATE: u8 = 5; +const BLOB_ID: u8 = 6; +const BLOB: u8 = 7; +const KEYWORD: u8 = 8; +const LIST: u8 = 9; +const OBJECT: u8 = 10; +const ACL: u8 = 11; +const NULL: u8 = 12; + +pub trait DeserializeFrom: Sized { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option; +} + +impl Deserialize for Object { + fn deserialize(bytes: &[u8]) -> trc::Result { + Object::deserialize_from(&mut bytes.iter()).ok_or_else(|| { + trc::StoreEvent::DataCorruption + .caused_by(trc::location!()) + .ctx(trc::Key::Value, bytes) + }) + } +} + +impl DeserializeFrom for AclGrant { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + let account_id = bytes.next_leb128()?; + let mut grants = [0u8; U64_LEN]; + for byte in grants.iter_mut() { + *byte = *bytes.next()?; + } + + Some(Self { + account_id, + grants: Bitmap::from(u64::from_be_bytes(grants)), + }) + } +} + +impl DeserializeFrom for Object { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option> { + let len = bytes.next_leb128()?; + let mut properties = VecMap::with_capacity(len); + for _ in 0..len { + let key = Property::deserialize_from(bytes)?; + let value = Value::deserialize_from(bytes)?; + properties.append(key, value); + } + Some(Object { properties }) + } +} + +impl DeserializeFrom for Value { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + match *bytes.next()? { + TEXT => Some(Value::Text(String::deserialize_from(bytes)?)), + UNSIGNED_INT => Some(Value::UnsignedInt(bytes.next_leb128()?)), + BOOL_TRUE => Some(Value::Bool(true)), + BOOL_FALSE => Some(Value::Bool(false)), + ID => Some(Value::Id(Id::new(bytes.next_leb128()?))), + DATE => Some(Value::Date(UTCDate::from_timestamp( + bytes.next_leb128::()? as i64, + ))), + BLOB_ID => Some(Value::BlobId(BlobId::deserialize_from(bytes)?)), + KEYWORD => Some(Value::Keyword(Keyword::deserialize_from(bytes)?)), + LIST => { + let len = bytes.next_leb128()?; + let mut items = Vec::with_capacity(len); + for _ in 0..len { + items.push(Value::deserialize_from(bytes)?); + } + Some(Value::List(items)) + } + OBJECT => Some(Value::Object(jmap_proto::types::value::Object( + Object::deserialize_from(bytes)?.properties, + ))), + BLOB => Some(Value::Blob(Vec::deserialize_from(bytes)?)), + ACL => { + let len = bytes.next_leb128()?; + let mut items = Vec::with_capacity(len); + for _ in 0..len { + items.push(AclGrant::deserialize_from(bytes)?); + } + Some(Value::Acl(items)) + } + NULL => Some(Value::Null), + _ => None, + } + } +} + +impl DeserializeFrom for u32 { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + bytes.next_leb128() + } +} + +impl DeserializeFrom for u64 { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + bytes.next_leb128() + } +} + +impl DeserializeFrom for String { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + >::deserialize_from(bytes).and_then(|s| String::from_utf8(s).ok()) + } +} + +impl DeserializeFrom for Vec { + fn deserialize_from(bytes: &mut Iter<'_, u8>) -> Option { + let len: usize = bytes.next_leb128()?; + let mut buf = Vec::with_capacity(len); + for _ in 0..len { + buf.push(*bytes.next()?); + } + buf.into() + } +} + +impl DeserializeFrom for BlobId { + fn deserialize_from(bytes: &mut std::slice::Iter<'_, u8>) -> Option { + BlobId::from_iter(bytes) + } +} + +impl DeserializeFrom for Keyword { + fn deserialize_from(bytes: &mut std::slice::Iter<'_, u8>) -> Option { + match bytes.next_leb128::()? { + SEEN => Some(Keyword::Seen), + DRAFT => Some(Keyword::Draft), + FLAGGED => Some(Keyword::Flagged), + ANSWERED => Some(Keyword::Answered), + RECENT => Some(Keyword::Recent), + IMPORTANT => Some(Keyword::Important), + PHISHING => Some(Keyword::Phishing), + JUNK => Some(Keyword::Junk), + NOTJUNK => Some(Keyword::NotJunk), + DELETED => Some(Keyword::Deleted), + FORWARDED => Some(Keyword::Forwarded), + MDN_SENT => Some(Keyword::MdnSent), + other => { + let len = other - OTHER; + let mut keyword = Vec::with_capacity(len); + for _ in 0..len { + keyword.push(*bytes.next()?); + } + Some(Keyword::Other(String::from_utf8(keyword).ok()?)) + } + } + } +} + +impl DeserializeFrom for Property { + fn deserialize_from(bytes: &mut std::slice::Iter<'_, u8>) -> Option { + match *bytes.next()? { + 0 => Some(Property::IsActive), + 1 => Some(Property::IsEnabled), + 2 => Some(Property::IsSubscribed), + 3 => Some(Property::Keys), + 4 => Some(Property::Keywords), + 5 => Some(Property::Language), + 6 => Some(Property::Location), + 7 => Some(Property::MailboxIds), + 8 => Some(Property::MayDelete), + 9 => Some(Property::MdnBlobIds), + 10 => Some(Property::Members), + 11 => Some(Property::MessageId), + 12 => Some(Property::MyRights), + 13 => Some(Property::Name), + 14 => Some(Property::ParentId), + 15 => Some(Property::PartId), + 16 => Some(Property::Picture), + 17 => Some(Property::Preview), + 18 => Some(Property::Quota), + 19 => Some(Property::ReceivedAt), + 20 => Some(Property::References), + 21 => Some(Property::ReplyTo), + 22 => Some(Property::Role), + 23 => Some(Property::Secret), + 24 => Some(Property::SendAt), + 25 => Some(Property::Sender), + 26 => Some(Property::SentAt), + 27 => Some(Property::Size), + 28 => Some(Property::SortOrder), + 29 => Some(Property::Subject), + 30 => Some(Property::SubParts), + 31 => Some(Property::TextBody), + 32 => Some(Property::TextSignature), + 33 => Some(Property::ThreadId), + 34 => Some(Property::Timezone), + 35 => Some(Property::To), + 36 => Some(Property::ToDate), + 37 => Some(Property::TotalEmails), + 38 => Some(Property::TotalThreads), + 39 => Some(Property::Type), + 40 => Some(Property::Types), + 41 => Some(Property::UndoStatus), + 42 => Some(Property::UnreadEmails), + 43 => Some(Property::UnreadThreads), + 44 => Some(Property::Url), + 45 => Some(Property::VerificationCode), + 46 => Some(Property::Parameters), + 47 => Some(Property::Addresses), + 48 => Some(Property::P256dh), + 49 => Some(Property::Auth), + 50 => Some(Property::Value), + 51 => Some(Property::SmtpReply), + 52 => Some(Property::Delivered), + 53 => Some(Property::Displayed), + 54 => Some(Property::MailFrom), + 55 => Some(Property::RcptTo), + 56 => Some(Property::IsEncodingProblem), + 57 => Some(Property::IsTruncated), + 58 => Some(Property::MayReadItems), + 59 => Some(Property::MayAddItems), + 60 => Some(Property::MayRemoveItems), + 61 => Some(Property::MaySetSeen), + 62 => Some(Property::MaySetKeywords), + 63 => Some(Property::MayCreateChild), + 64 => Some(Property::MayRename), + 65 => Some(Property::MaySubmit), + 66 => Some(Property::Acl), + 67 => Some(Property::Aliases), + 68 => Some(Property::Attachments), + 69 => Some(Property::Bcc), + 70 => Some(Property::BlobId), + 71 => Some(Property::BodyStructure), + 72 => Some(Property::BodyValues), + 73 => Some(Property::Capabilities), + 74 => Some(Property::Cc), + 75 => Some(Property::Charset), + 76 => Some(Property::Cid), + 77 => Some(Property::DeliveryStatus), + 78 => Some(Property::Description), + 79 => Some(Property::DeviceClientId), + 80 => Some(Property::Disposition), + 81 => Some(Property::DsnBlobIds), + 82 => Some(Property::Email), + 83 => Some(Property::EmailId), + 84 => Some(Property::EmailIds), + 85 => Some(Property::Envelope), + 86 => Some(Property::Expires), + 87 => Some(Property::From), + 88 => Some(Property::FromDate), + 89 => Some(Property::HasAttachment), + 90 => Some(Property::Header(HeaderProperty { + form: HeaderForm::Raw, + header: String::new(), + all: false, + })), // Never serialized + 91 => Some(Property::Headers), + 92 => Some(Property::HtmlBody), + 93 => Some(Property::HtmlSignature), + 94 => Some(Property::Id), + 95 => Some(Property::IdentityId), + 96 => Some(Property::InReplyTo), + 97 => String::deserialize_from(bytes).map(Property::_T), + 98 => Some(Property::ResourceType), + 99 => Some(Property::Used), + 100 => Some(Property::HardLimit), + 101 => Some(Property::WarnLimit), + 102 => Some(Property::SoftLimit), + 103 => Some(Property::Scope), + _ => None, + } + } +} + +pub trait FromLegacy { + fn from_legacy(legacy: Object) -> Self; +} + +pub trait TryFromLegacy: Sized { + fn try_from_legacy(legacy: Object) -> Option; +} diff --git a/crates/migration/src/principal.rs b/crates/migration/src/principal.rs new file mode 100644 index 00000000..0ed6a3c2 --- /dev/null +++ b/crates/migration/src/principal.rs @@ -0,0 +1,373 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use directory::{ + Permission, PermissionGrant, Principal, PrincipalData, PrincipalQuota, ROLE_ADMIN, ROLE_USER, + Type, + backend::internal::{PrincipalField, PrincipalSet}, +}; +use jmap_proto::types::collection::Collection; +use nlp::tokenizers::word::WordTokenizer; +use std::{slice::Iter, time::Instant}; +use store::{ + Deserialize, Serialize, ValueKey, + ahash::{AHashMap, AHashSet}, + backend::MAX_TOKEN_LENGTH, + roaring::RoaringBitmap, + write::{AlignedBytes, Archive, Archiver, BatchBuilder, DirectoryClass, ValueClass}, +}; +use trc::AddContext; +use utils::codec::leb128::Leb128Iterator; + +use crate::{ + email::migrate_emails, encryption::migrate_encryption_params, identity::migrate_identities, + mailbox::migrate_mailboxes, push::migrate_push_subscriptions, sieve::migrate_sieve, + submission::migrate_email_submissions, threads::migrate_threads, +}; + +pub(crate) async fn migrate_principals(server: &Server) -> trc::Result { + // Obtain email ids + let principal_ids = server + .get_document_ids(u32::MAX, Collection::Principal) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_principals = principal_ids.len(); + if num_principals == 0 { + return Ok(principal_ids); + } + let mut num_migrated = 0; + + for principal_id in principal_ids.iter() { + match server + .store() + .get_value::(ValueKey { + account_id: u32::MAX, + collection: Collection::Principal.into(), + document_id: principal_id, + class: ValueClass::Directory(DirectoryClass::Principal(principal_id)), + }) + .await + { + Ok(Some(legacy)) => { + let principal = Principal::from_legacy(legacy); + let mut batch = BatchBuilder::new(); + batch + .with_account_id(u32::MAX) + .with_collection(Collection::Principal) + .update_document(principal_id); + + build_search_index(&mut batch, principal_id, &principal); + + batch.set( + ValueClass::Directory(DirectoryClass::Principal(principal_id)), + Archiver::new(principal) + .serialize() + .caused_by(trc::location!())?, + ); + num_migrated += 1; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id: u32::MAX, + collection: Collection::Principal.into(), + document_id: principal_id, + class: ValueClass::Directory(DirectoryClass::Principal(principal_id)), + }) + .await + .is_err() + { + return Err(err.account_id(principal_id).caused_by(trc::location!())); + } + } + } + } + + // Increment document id counter + if num_migrated > 0 { + server + .store() + .assign_document_ids(u32::MAX, Collection::Principal, num_principals + 1) + .await + .caused_by(trc::location!())?; + + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migrated {num_migrated} principals",) + ); + } + + Ok(principal_ids) +} + +pub(crate) async fn migrate_principal(server: &Server, account_id: u32) -> trc::Result<()> { + let start_time = Instant::now(); + let num_emails = migrate_emails(server, account_id) + .await + .caused_by(trc::location!())?; + let num_mailboxes = migrate_mailboxes(server, account_id) + .await + .caused_by(trc::location!())?; + let num_params = migrate_encryption_params(server, account_id) + .await + .caused_by(trc::location!())?; + let num_subscriptions = migrate_push_subscriptions(server, account_id) + .await + .caused_by(trc::location!())?; + let num_sieve = migrate_sieve(server, account_id) + .await + .caused_by(trc::location!())?; + let num_submissions = migrate_email_submissions(server, account_id) + .await + .caused_by(trc::location!())?; + let num_threads = migrate_threads(server, account_id) + .await + .caused_by(trc::location!())?; + let num_identities = migrate_identities(server, account_id) + .await + .caused_by(trc::location!())?; + + if num_emails > 0 + || num_mailboxes > 0 + || num_params > 0 + || num_subscriptions > 0 + || num_sieve > 0 + || num_submissions > 0 + || num_threads > 0 + || num_identities > 0 + { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!( + "Migrated accountId {account_id} with {num_emails} emails, {num_mailboxes} mailboxes, {num_params} encryption params, {num_submissions} email submissions, {num_sieve} sieve scripts, {num_subscriptions} push subscriptions, {num_threads} threads, and {num_identities} identities" + ), + Elapsed = start_time.elapsed() + ); + } + + Ok(()) +} + +trait FromLegacy { + fn from_legacy(legacy: LegacyPrincipal) -> Self; +} + +impl FromLegacy for Principal { + fn from_legacy(legacy: LegacyPrincipal) -> Self { + let mut legacy = legacy.0; + let mut principal = Principal { + id: legacy.id, + typ: legacy.typ, + tenant: legacy.tenant(), + name: legacy.name().to_string(), + description: legacy.take_str(PrincipalField::Description), + secrets: Default::default(), + emails: Default::default(), + quota: Default::default(), + data: Default::default(), + }; + + // Map fields + principal.secrets = legacy + .take_str_array(PrincipalField::Secrets) + .unwrap_or_default(); + principal.emails = legacy + .take_str_array(PrincipalField::Emails) + .unwrap_or_default(); + if let Some(picture) = legacy.take_str(PrincipalField::Picture) { + principal.data.push(PrincipalData::Picture(picture)); + } + if let Some(urls) = legacy.take_str_array(PrincipalField::Urls) { + principal.data.push(PrincipalData::Urls(urls)); + } + if let Some(urls) = legacy.take_str_array(PrincipalField::ExternalMembers) { + principal.data.push(PrincipalData::ExternalMembers(urls)); + } + if let Some(quotas) = legacy.take_int_array(PrincipalField::Quota) { + let mut principal_quotas = Vec::new(); + + for (idx, quota) in quotas.into_iter().take(Type::MAX_ID + 2).enumerate() { + if idx != 0 { + principal_quotas.push(PrincipalQuota { + quota, + typ: Type::from_u8((idx - 1) as u8), + }); + } else if quota != 0 { + principal.quota = Some(quota); + } + } + + if !principal_quotas.is_empty() { + principal + .data + .push(PrincipalData::PrincipalQuota(principal_quotas)); + } + } + + // Map permissions + let mut permissions = AHashMap::new(); + for field in [ + PrincipalField::EnabledPermissions, + PrincipalField::DisabledPermissions, + ] { + let is_disabled = field == PrincipalField::DisabledPermissions; + if let Some(ids) = legacy.take_int_array(field) { + for id in ids { + if let Some(permission) = Permission::from_id(id as usize) { + permissions.insert(permission, is_disabled); + } + } + } + } + if !permissions.is_empty() { + principal.data.push(PrincipalData::Permissions( + permissions + .into_iter() + .map(|(k, v)| PermissionGrant { + permission: k, + grant: !v, + }) + .collect(), + )); + } + + principal + } +} + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct LegacyPrincipal(PrincipalSet); + +impl Deserialize for LegacyPrincipal { + fn deserialize(bytes: &[u8]) -> trc::Result { + deserialize(bytes).ok_or_else(|| { + trc::StoreEvent::DataCorruption + .caused_by(trc::location!()) + .ctx(trc::Key::Value, bytes) + }) + } +} + +const INT_MARKER: u8 = 1 << 7; + +fn deserialize(bytes: &[u8]) -> Option { + let mut bytes = bytes.iter(); + + match *bytes.next()? { + 1 => { + // Version 1 (legacy) + let id = bytes.next_leb128()?; + let type_id = *bytes.next()?; + + let mut principal = PrincipalSet { + id, + typ: Type::from_u8(type_id), + ..Default::default() + }; + + principal.set(PrincipalField::Quota, bytes.next_leb128::()?); + principal.set(PrincipalField::Name, deserialize_string(&mut bytes)?); + if let Some(description) = deserialize_string(&mut bytes).filter(|s| !s.is_empty()) { + principal.set(PrincipalField::Description, description); + } + for key in [PrincipalField::Secrets, PrincipalField::Emails] { + for _ in 0..bytes.next_leb128::()? { + principal.append_str(key, deserialize_string(&mut bytes)?); + } + } + + LegacyPrincipal(principal.with_field( + PrincipalField::Roles, + if type_id != 4 { ROLE_USER } else { ROLE_ADMIN }, + )) + .into() + } + 2 => { + // Version 2 + let typ = Type::from_u8(*bytes.next()?); + let num_fields = bytes.next_leb128::()?; + + let mut principal = PrincipalSet { + id: u32::MAX, + typ, + fields: AHashMap::with_capacity(num_fields), + }; + + for _ in 0..num_fields { + let id = *bytes.next()?; + let num_values = bytes.next_leb128::()?; + + if (id & INT_MARKER) == 0 { + let field = PrincipalField::from_id(id)?; + if num_values == 1 { + principal.set(field, deserialize_string(&mut bytes)?); + } else { + let mut values = Vec::with_capacity(num_values); + for _ in 0..num_values { + values.push(deserialize_string(&mut bytes)?); + } + principal.set(field, values); + } + } else { + let field = PrincipalField::from_id(id & !INT_MARKER)?; + if num_values == 1 { + principal.set(field, bytes.next_leb128::()?); + } else { + let mut values = Vec::with_capacity(num_values); + for _ in 0..num_values { + values.push(bytes.next_leb128::()?); + } + principal.set(field, values); + } + } + } + + LegacyPrincipal(principal).into() + } + _ => None, + } +} + +fn deserialize_string(bytes: &mut Iter<'_, u8>) -> Option { + let len = bytes.next_leb128()?; + let mut string = Vec::with_capacity(len); + for _ in 0..len { + string.push(*bytes.next()?); + } + String::from_utf8(string).ok() +} + +pub(crate) fn build_search_index(batch: &mut BatchBuilder, principal_id: u32, new: &Principal) { + let mut new_words = AHashSet::new(); + + for word in [Some(new.name.as_str()), new.description.as_deref()] + .into_iter() + .chain(new.emails.iter().map(|s| Some(s.as_str()))) + .flatten() + { + new_words.extend(WordTokenizer::new(word, MAX_TOKEN_LENGTH).map(|t| t.word)); + } + + for word in new_words { + batch.set( + DirectoryClass::Index { + word: word.as_bytes().to_vec(), + principal_id, + }, + vec![], + ); + } +} diff --git a/crates/migration/src/push.rs b/crates/migration/src/push.rs new file mode 100644 index 00000000..95d05b5a --- /dev/null +++ b/crates/migration/src/push.rs @@ -0,0 +1,175 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::object::Object; +use crate::object::FromLegacy; +use base64::{Engine, engine::general_purpose}; +use common::Server; +use email::push::{Keys, PushSubscription}; +use jmap_proto::types::{ + collection::Collection, property::Property, type_state::DataType, value::Value, +}; +use store::{ + Serialize, ValueKey, + write::{AlignedBytes, Archive, Archiver, BatchBuilder, ValueClass}, +}; +use trc::AddContext; + +pub(crate) async fn migrate_push_subscriptions( + server: &Server, + account_id: u32, +) -> trc::Result { + // Obtain email ids + let push_subscription_ids = server + .get_document_ids(account_id, Collection::PushSubscription) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_push_subscriptions = push_subscription_ids.len(); + if num_push_subscriptions == 0 { + return Ok(0); + } + let mut did_migrate = false; + + for push_subscription_id in push_subscription_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::PushSubscription.into(), + document_id: push_subscription_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + { + Ok(Some(legacy)) => { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::PushSubscription) + .update_document(push_subscription_id) + .set( + Property::Value, + Archiver::new(PushSubscription::from_legacy(legacy)) + .serialize() + .caused_by(trc::location!())?, + ); + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::PushSubscription.into(), + document_id: push_subscription_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(push_subscription_id) + .caused_by(trc::location!())); + } + } + } + } + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids( + account_id, + Collection::PushSubscription, + num_push_subscriptions + 1, + ) + .await + .caused_by(trc::location!())?; + Ok(num_push_subscriptions) + } else { + Ok(0) + } +} + +impl FromLegacy for PushSubscription { + fn from_legacy(legacy: Object) -> Self { + let (verification_code, verified) = legacy + .get(&Property::VerificationCode) + .as_string() + .map(|c| (c.to_string(), true)) + .or_else(|| { + legacy + .get(&Property::Value) + .as_string() + .map(|c| (c.to_string(), false)) + }) + .unwrap_or_default(); + + PushSubscription { + url: legacy + .get(&Property::Url) + .as_string() + .unwrap_or_default() + .to_string(), + device_client_id: legacy + .get(&Property::DeviceClientId) + .as_string() + .unwrap_or_default() + .to_string(), + expires: legacy + .get(&Property::Expires) + .as_date() + .map(|s| s.timestamp() as u64) + .unwrap_or_default(), + verification_code, + verified, + types: legacy + .get(&Property::Types) + .as_list() + .map(|l| l.as_slice()) + .unwrap_or_default() + .iter() + .filter_map(|v| v.as_string().and_then(|v| DataType::try_from(v).ok())) + .collect(), + keys: convert_keys(legacy.get(&Property::Keys)), + } + } +} + +fn convert_keys(value: &Value) -> Option { + let mut addr = Keys { + p256dh: Default::default(), + auth: Default::default(), + }; + if let Value::Object(obj) = value { + for (key, value) in &obj.0 { + match (key, value) { + (Property::Auth, Value::Text(value)) => { + addr.auth = general_purpose::URL_SAFE.decode(value).unwrap_or_default(); + } + (Property::P256dh, Value::Text(value)) => { + addr.p256dh = general_purpose::URL_SAFE.decode(value).unwrap_or_default(); + } + _ => {} + } + } + } + if !addr.p256dh.is_empty() && !addr.auth.is_empty() { + Some(addr) + } else { + None + } +} diff --git a/crates/migration/src/queue.rs b/crates/migration/src/queue.rs new file mode 100644 index 00000000..500499c9 --- /dev/null +++ b/crates/migration/src/queue.rs @@ -0,0 +1,100 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use crate::LegacyBincode; +use common::Server; +use smtp::queue::Message; +use store::{ + IterateParams, Serialize, U64_LEN, ValueKey, + ahash::AHashSet, + write::{ + AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ValueClass, + key::DeserializeBigEndian, + }, +}; +use trc::AddContext; + +pub(crate) async fn migrate_queue(server: &Server) -> trc::Result<()> { + let from_key = ValueKey::from(ValueClass::Queue(QueueClass::MessageEvent( + store::write::QueueEvent { + due: 0, + queue_id: 0, + }, + ))); + let to_key = ValueKey::from(ValueClass::Queue(QueueClass::MessageEvent( + store::write::QueueEvent { + due: u64::MAX, + queue_id: u64::MAX, + }, + ))); + + let mut queue_ids = AHashSet::new(); + + server + .store() + .iterate( + IterateParams::new(from_key, to_key).ascending().no_values(), + |key, _| { + queue_ids.insert(key.deserialize_be_u64(U64_LEN)?); + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + + let count = queue_ids.len(); + + for queue_id in queue_ids { + match server + .store() + .get_value::>(ValueKey::from(ValueClass::Queue( + QueueClass::Message(queue_id), + ))) + .await + { + Ok(Some(bincoded)) => { + let mut batch = BatchBuilder::new(); + batch.set( + ValueClass::Queue(QueueClass::Message(queue_id)), + Archiver::new(bincoded.inner) + .serialize() + .caused_by(trc::location!())?, + ); + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey::from(ValueClass::Queue( + QueueClass::Message(queue_id), + ))) + .await + .is_err() + { + return Err(err + .ctx(trc::Key::QueueId, queue_id) + .caused_by(trc::location!())); + } + } + } + } + + if count > 0 { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migrated {count} queued messages",) + ); + } + + Ok(()) +} diff --git a/crates/migration/src/report.rs b/crates/migration/src/report.rs new file mode 100644 index 00000000..351f8dba --- /dev/null +++ b/crates/migration/src/report.rs @@ -0,0 +1,227 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use crate::LegacyBincode; +use common::Server; +use mail_auth::report::{Feedback, Report, tlsrpt::TlsReport}; +use smtp::reporting::analysis::IncomingReport; +use store::{ + IterateParams, SUBSPACE_REPORT_OUT, Serialize, U64_LEN, ValueKey, + ahash::AHashSet, + write::{ + AlignedBytes, AnyKey, BatchBuilder, ReportClass, UnversionedArchive, UnversionedArchiver, + ValueClass, + key::{DeserializeBigEndian, KeySerializer}, + }, +}; +use trc::AddContext; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +enum ReportType { + Dmarc, + Tls, + Arf, +} + +pub(crate) async fn migrate_reports(server: &Server) -> trc::Result<()> { + let mut num_dmarc = 0; + let mut num_tls = 0; + let mut num_arf = 0; + + for report in [ReportType::Dmarc, ReportType::Tls, ReportType::Arf] { + let (from_key, to_key) = match report { + ReportType::Dmarc => ( + ValueKey::from(ValueClass::Report(ReportClass::Dmarc { id: 0, expires: 0 })), + ValueKey::from(ValueClass::Report(ReportClass::Dmarc { + id: u64::MAX, + expires: u64::MAX, + })), + ), + ReportType::Tls => ( + ValueKey::from(ValueClass::Report(ReportClass::Tls { id: 0, expires: 0 })), + ValueKey::from(ValueClass::Report(ReportClass::Tls { + id: u64::MAX, + expires: u64::MAX, + })), + ), + ReportType::Arf => ( + ValueKey::from(ValueClass::Report(ReportClass::Arf { id: 0, expires: 0 })), + ValueKey::from(ValueClass::Report(ReportClass::Arf { + id: u64::MAX, + expires: u64::MAX, + })), + ), + }; + + let mut results = AHashSet::new(); + + server + .core + .storage + .data + .iterate( + IterateParams::new(from_key, to_key).no_values(), + |key, _| { + results.insert(( + report, + key.deserialize_be_u64(U64_LEN + 1)?, + key.deserialize_be_u64(1)?, + )); + + Ok(true) + }, + ) + .await + .caused_by(trc::location!())?; + + for (report, id, expires) in results { + match report { + ReportType::Dmarc => { + match server + .store() + .get_value::>>(ValueKey::from( + ValueClass::Report(ReportClass::Dmarc { id, expires }), + )) + .await + { + Ok(Some(bincoded)) => { + let mut batch = BatchBuilder::new(); + batch.set( + ValueClass::Report(ReportClass::Dmarc { id, expires }), + UnversionedArchiver::new(bincoded.inner) + .serialize() + .caused_by(trc::location!())?, + ); + num_dmarc += 1; + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey::from( + ValueClass::Report(ReportClass::Dmarc { id, expires }), + )) + .await + .is_err() + { + return Err(err.ctx(trc::Key::Id, id).caused_by(trc::location!())); + } + } + } + } + ReportType::Tls => { + match server + .store() + .get_value::>>(ValueKey::from( + ValueClass::Report(ReportClass::Tls { id, expires }), + )) + .await + { + Ok(Some(bincoded)) => { + let mut batch = BatchBuilder::new(); + batch.set( + ValueClass::Report(ReportClass::Tls { id, expires }), + UnversionedArchiver::new(bincoded.inner) + .serialize() + .caused_by(trc::location!())?, + ); + num_tls += 1; + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey::from( + ValueClass::Report(ReportClass::Tls { id, expires }), + )) + .await + .is_err() + { + return Err(err.ctx(trc::Key::Id, id).caused_by(trc::location!())); + } + } + } + } + ReportType::Arf => { + match server + .store() + .get_value::>>(ValueKey::from( + ValueClass::Report(ReportClass::Arf { id, expires }), + )) + .await + { + Ok(Some(bincoded)) => { + let mut batch = BatchBuilder::new(); + batch.set( + ValueClass::Report(ReportClass::Arf { id, expires }), + UnversionedArchiver::new(bincoded.inner) + .serialize() + .caused_by(trc::location!())?, + ); + num_arf += 1; + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey::from( + ValueClass::Report(ReportClass::Arf { id, expires }), + )) + .await + .is_err() + { + return Err(err.ctx(trc::Key::Id, id).caused_by(trc::location!())); + } + } + } + } + } + } + } + + // Delete outgoing reports + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_REPORT_OUT, + key: KeySerializer::new(U64_LEN).write(0u8).finalize(), + }, + AnyKey { + subspace: SUBSPACE_REPORT_OUT, + key: KeySerializer::new(U64_LEN) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + + if num_dmarc > 0 || num_tls > 0 || num_arf > 0 { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = + format!("Migrated {num_dmarc} DMARC, {num_tls} TLS, and {num_arf} ARF reports") + ); + } + + Ok(()) +} diff --git a/crates/migration/src/sieve.rs b/crates/migration/src/sieve.rs new file mode 100644 index 00000000..ede23a8c --- /dev/null +++ b/crates/migration/src/sieve.rs @@ -0,0 +1,222 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::object::Object; +use crate::object::TryFromLegacy; +use common::Server; +use email::sieve::{SieveScript, VacationResponse}; +use jmap_proto::types::{collection::Collection, property::Property, value::Value}; +use store::{ + SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, SUBSPACE_PROPERTY, Serialize, U64_LEN, ValueKey, + write::{ + AlignedBytes, AnyKey, Archive, Archiver, BatchBuilder, ValueClass, key::KeySerializer, + }, +}; +use trc::{AddContext, StoreEvent}; + +pub(crate) async fn migrate_sieve(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let script_ids = server + .get_document_ids(account_id, Collection::SieveScript) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_scripts = script_ids.len(); + if num_scripts == 0 { + return Ok(0); + } + let mut did_migrate = false; + + // Delete indexes + for subspace in [SUBSPACE_INDEXES, SUBSPACE_BITMAP_TEXT] { + server + .store() + .delete_range( + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::SieveScript)) + .finalize(), + }, + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::SieveScript)) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + } + + for script_id in script_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::SieveScript.into(), + document_id: script_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + { + Ok(Some(legacy)) => { + if let Some(script) = SieveScript::try_from_legacy(legacy) { + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::SieveScript) + .update_document(script_id) + .index( + Property::IsActive, + if script.is_active { + vec![1u8] + } else { + vec![0u8] + }, + ) + .index(Property::Name, script.name.to_lowercase()) + .set( + Property::Value, + Archiver::new(script) + .serialize() + .caused_by(trc::location!())?, + ); + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } else { + trc::event!( + Store(StoreEvent::DataCorruption), + Details = "Failed to migrate SieveScript", + AccountId = account_id, + ) + } + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::SieveScript.into(), + document_id: script_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(script_id) + .caused_by(trc::location!())); + } + } + } + } + + // Delete emailIds property + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_PROPERTY, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::SieveScript)) + .write(u8::from(Property::EmailIds)) + .finalize(), + }, + AnyKey { + subspace: SUBSPACE_PROPERTY, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::SieveScript)) + .write(u8::from(Property::EmailIds)) + .write(&[u8::MAX; 8][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids(account_id, Collection::SieveScript, num_scripts + 1) + .await + .caused_by(trc::location!())?; + Ok(num_scripts) + } else { + Ok(0) + } +} + +impl TryFromLegacy for SieveScript { + fn try_from_legacy(legacy: Object) -> Option { + let blob_id = legacy.get(&Property::BlobId).as_blob_id()?; + Some(SieveScript { + name: legacy + .get(&Property::Name) + .as_string() + .unwrap_or_default() + .to_string(), + is_active: legacy + .get(&Property::IsActive) + .as_bool() + .unwrap_or_default(), + blob_hash: blob_id.hash.clone(), + size: blob_id.section.as_ref()?.size as u32, + vacation_response: VacationResponse::try_from_legacy(legacy), + }) + } +} + +impl TryFromLegacy for VacationResponse { + fn try_from_legacy(legacy: Object) -> Option { + let vacation = VacationResponse { + from_date: legacy + .get(&Property::FromDate) + .as_date() + .map(|s| s.timestamp() as u64), + to_date: legacy + .get(&Property::ToDate) + .as_date() + .map(|s| s.timestamp() as u64), + subject: legacy + .get(&Property::Name) + .as_string() + .map(|s| s.to_string()), + text_body: legacy + .get(&Property::TextBody) + .as_string() + .map(|s| s.to_string()), + html_body: legacy + .get(&Property::HtmlBody) + .as_string() + .map(|s| s.to_string()), + }; + + if vacation.from_date.is_some() + || vacation.to_date.is_some() + || vacation.subject.is_some() + || vacation.text_body.is_some() + || vacation.html_body.is_some() + { + Some(vacation) + } else { + None + } + } +} diff --git a/crates/migration/src/submission.rs b/crates/migration/src/submission.rs new file mode 100644 index 00000000..3aa335cf --- /dev/null +++ b/crates/migration/src/submission.rs @@ -0,0 +1,259 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use super::object::Object; +use crate::object::FromLegacy; +use common::Server; +use email::submission::{ + Address, Delivered, DeliveryStatus, EmailSubmission, Envelope, UndoStatus, +}; +use jmap_proto::types::{collection::Collection, property::Property, value::Value}; +use store::{ + SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, Serialize, SerializeInfallible, + U64_LEN, ValueKey, + write::{ + AlignedBytes, AnyKey, Archive, Archiver, BatchBuilder, ValueClass, key::KeySerializer, + }, +}; +use trc::AddContext; +use utils::map::vec_map::VecMap; + +pub(crate) async fn migrate_email_submissions( + server: &Server, + account_id: u32, +) -> trc::Result { + // Obtain email ids + let email_submission_ids = server + .get_document_ids(account_id, Collection::EmailSubmission) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_email_submissions = email_submission_ids.len(); + if num_email_submissions == 0 { + return Ok(0); + } + let mut did_migrate = false; + + // Delete indexes + for subspace in [SUBSPACE_INDEXES, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT] { + server + .store() + .delete_range( + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::EmailSubmission)) + .finalize(), + }, + AnyKey { + subspace, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::EmailSubmission)) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + } + + for email_submission_id in email_submission_ids { + match server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::EmailSubmission.into(), + document_id: email_submission_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + { + Ok(Some(legacy)) => { + let es = EmailSubmission::from_legacy(legacy); + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::EmailSubmission) + .update_document(email_submission_id) + .index(Property::UndoStatus, es.undo_status.as_index()) + .index(Property::EmailId, es.email_id.serialize()) + .index(Property::ThreadId, es.thread_id.serialize()) + .index(Property::IdentityId, es.identity_id.serialize()) + .index(Property::SendAt, es.send_at.serialize()) + .set( + Property::Value, + Archiver::new(es).serialize().caused_by(trc::location!())?, + ); + did_migrate = true; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Ok(None) => (), + Err(err) => { + if server + .store() + .get_value::>(ValueKey { + account_id, + collection: Collection::EmailSubmission.into(), + document_id: email_submission_id, + class: ValueClass::Property(Property::Value.into()), + }) + .await + .is_err() + { + return Err(err + .account_id(account_id) + .document_id(email_submission_id) + .caused_by(trc::location!())); + } + } + } + } + + // Increment document id counter + if did_migrate { + server + .store() + .assign_document_ids( + account_id, + Collection::EmailSubmission, + num_email_submissions + 1, + ) + .await + .caused_by(trc::location!())?; + Ok(num_email_submissions) + } else { + Ok(0) + } +} + +impl FromLegacy for EmailSubmission { + fn from_legacy(legacy: Object) -> Self { + EmailSubmission { + email_id: legacy.get(&Property::EmailId).as_uint().unwrap_or_default() as u32, + thread_id: legacy + .get(&Property::ThreadId) + .as_uint() + .unwrap_or_default() as u32, + identity_id: legacy + .get(&Property::IdentityId) + .as_uint() + .unwrap_or_default() as u32, + send_at: legacy + .get(&Property::SentAt) + .as_date() + .map(|s| s.timestamp() as u64) + .unwrap_or_default(), + queue_id: legacy.get(&Property::MessageId).as_uint(), + undo_status: legacy + .get(&Property::UndoStatus) + .as_string() + .and_then(UndoStatus::parse) + .unwrap_or(UndoStatus::Final), + envelope: convert_envelope(legacy.get(&Property::Envelope)), + delivery_status: convert_delivery_status(legacy.get(&Property::DeliveryStatus)), + } + } +} + +fn convert_delivery_status(value: &Value) -> VecMap { + let mut status = VecMap::new(); + if let Value::List(list) = value { + for value in list { + if let Value::Object(obj) = value { + for (k, v) in obj.0.iter() { + if let (Property::_T(k), Value::Object(v)) = (k, v) { + let mut delivery_status = DeliveryStatus { + smtp_reply: String::new(), + delivered: Delivered::Unknown, + displayed: false, + }; + + for (property, value) in &v.0 { + match (property, value) { + (Property::Delivered, Value::Text(v)) => match v.as_str() { + "queued" => delivery_status.delivered = Delivered::Queued, + "yes" => delivery_status.delivered = Delivered::Yes, + "unknown" => delivery_status.delivered = Delivered::Unknown, + "no" => delivery_status.delivered = Delivered::No, + _ => {} + }, + (Property::SmtpReply, Value::Text(v)) => { + delivery_status.smtp_reply = v.to_string(); + } + + _ => {} + } + } + + status.append(k.to_string(), delivery_status); + } + } + } + } + } + status +} + +fn convert_envelope(value: &Value) -> Envelope { + let mut envelope = Envelope { + mail_from: Default::default(), + rcpt_to: vec![], + }; + + if let Value::Object(obj) = value { + for (property, value) in &obj.0 { + match (property, value) { + (Property::MailFrom, _) => { + envelope.mail_from = convert_envelope_address(value).unwrap_or_default(); + } + (Property::RcptTo, Value::List(value)) => { + for addr in value { + if let Some(addr) = convert_envelope_address(addr) { + envelope.rcpt_to.push(addr); + } + } + } + _ => {} + } + } + } + + envelope +} + +fn convert_envelope_address(envelope: &Value) -> Option
{ + if let Value::Object(envelope) = envelope { + if let (Value::Text(email), Value::Object(params)) = ( + envelope.get(&Property::Email), + envelope.get(&Property::Parameters), + ) { + let mut addr = Address { + email: email.to_string(), + parameters: None, + }; + for (k, v) in params.0.iter() { + if let Property::_T(k) = &k { + if !k.is_empty() { + let k = k.to_string(); + let v = v.as_string().map(|s| s.to_string()); + + addr.parameters.get_or_insert_default().append(k, v); + } + } + } + return Some(addr); + } + } + + None +} diff --git a/crates/migration/src/threads.rs b/crates/migration/src/threads.rs new file mode 100644 index 00000000..1b4f197c --- /dev/null +++ b/crates/migration/src/threads.rs @@ -0,0 +1,58 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use jmap_proto::types::collection::Collection; +use store::{ + SUBSPACE_BITMAP_ID, U64_LEN, + write::{AnyKey, key::KeySerializer}, +}; +use trc::AddContext; + +pub(crate) async fn migrate_threads(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let thread_ids = server + .get_document_ids(account_id, Collection::Thread) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_threads = thread_ids.len(); + if num_threads == 0 { + return Ok(0); + } + + // Delete threads + server + .store() + .delete_range( + AnyKey { + subspace: SUBSPACE_BITMAP_ID, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Thread)) + .finalize(), + }, + AnyKey { + subspace: SUBSPACE_BITMAP_ID, + key: KeySerializer::new(U64_LEN) + .write(account_id) + .write(u8::from(Collection::Thread)) + .write(&[u8::MAX; 16][..]) + .finalize(), + }, + ) + .await + .caused_by(trc::location!())?; + + // Increment document id counter + server + .store() + .assign_document_ids(account_id, Collection::Thread, num_threads + 1) + .await + .caused_by(trc::location!())?; + + Ok(num_threads) +} diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index a76e917d..26cfeac4 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -24,7 +24,7 @@ mail-auth = { version = "0.7", features = ["rkyv"] } mail-send = { version = "0.5", default-features = false, features = ["cram-md5", "ring", "tls12"] } mail-parser = { version = "0.11", features = ["full_encoding"] } mail-builder = { version = "0.4" } -smtp-proto = { version = "0.1.6", features = ["rkyv"] } +smtp-proto = { version = "0.1.6", features = ["rkyv", "serde"] } sieve-rs = { version = "0.7", features = ["rkyv"] } ahash = { version = "0.8" } rustls = { version = "0.23.5", default-features = false, features = ["std", "ring", "tls12"] } diff --git a/crates/smtp/src/queue/mod.rs b/crates/smtp/src/queue/mod.rs index fa96c122..7de546db 100644 --- a/crates/smtp/src/queue/mod.rs +++ b/crates/smtp/src/queue/mod.rs @@ -25,7 +25,7 @@ pub mod throttle; pub type QueueId = u64; -#[derive(Debug, Clone, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)] +#[derive(Debug, Clone, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Deserialize)] pub struct Schedule { pub due: u64, pub inner: T, @@ -46,7 +46,16 @@ pub enum MessageSource { Autogenerated, } -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)] +#[derive( + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + Debug, + Clone, + PartialEq, + Eq, + serde::Deserialize, +)] pub struct Message { pub queue_id: QueueId, pub created: u64, @@ -66,6 +75,7 @@ pub struct Message { pub quota_keys: Vec, #[rkyv(with = rkyv::with::Skip)] + #[serde(skip)] pub span_id: u64, } @@ -75,13 +85,31 @@ impl SerializedVersion for Message { } } -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)] +#[derive( + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + Debug, + Clone, + PartialEq, + Eq, + serde::Deserialize, +)] pub enum QuotaKey { Size { key: Vec, id: u64 }, Count { key: Vec, id: u64 }, } -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)] +#[derive( + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + Debug, + Clone, + PartialEq, + Eq, + serde::Deserialize, +)] pub struct Domain { pub domain: String, pub retry: Schedule, @@ -90,7 +118,16 @@ pub struct Domain { pub status: Status<(), Error>, } -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug, Clone, PartialEq, Eq)] +#[derive( + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + Debug, + Clone, + PartialEq, + Eq, + serde::Deserialize, +)] pub struct Recipient { pub domain_idx: u32, pub address: String, @@ -125,13 +162,31 @@ pub enum Status { PermanentFailure(E), } -#[derive(Debug, Clone, PartialEq, Eq, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)] +#[derive( + Debug, + Clone, + PartialEq, + Eq, + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + serde::Deserialize, +)] pub struct HostResponse { pub hostname: T, pub response: Response, } -#[derive(Debug, Clone, PartialEq, Eq, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)] +#[derive( + Debug, + Clone, + PartialEq, + Eq, + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + serde::Deserialize, +)] pub enum Error { DnsError(String), UnexpectedResponse(HostResponse), @@ -145,7 +200,15 @@ pub enum Error { } #[derive( - Debug, Clone, PartialEq, Eq, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Default, + Debug, + Clone, + PartialEq, + Eq, + rkyv::Serialize, + rkyv::Deserialize, + rkyv::Archive, + Default, + serde::Deserialize, )] pub struct ErrorDetails { pub entity: String, diff --git a/crates/smtp/src/reporting/analysis.rs b/crates/smtp/src/reporting/analysis.rs index 4556f107..6610f817 100644 --- a/crates/smtp/src/reporting/analysis.rs +++ b/crates/smtp/src/reporting/analysis.rs @@ -41,7 +41,9 @@ struct ReportData<'x> { data: &'x [u8], } -#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Serialize)] +#[derive( + rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, serde::Serialize, serde::Deserialize, +)] pub struct IncomingReport { pub from: String, pub to: Vec, diff --git a/crates/store/src/config.rs b/crates/store/src/config.rs index 717ac139..4b7e3fdf 100644 --- a/crates/store/src/config.rs +++ b/crates/store/src/config.rs @@ -4,14 +4,11 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::sync::Arc; - -use utils::config::{Config, cron::SimpleCron, utils::ParseValue}; - use crate::{ - BlobStore, CompressionAlgo, InMemoryStore, PubSubStore, PurgeSchedule, PurgeStore, Store, - Stores, backend::fs::FsStore, + BlobStore, CompressionAlgo, InMemoryStore, PurgeSchedule, PurgeStore, Store, Stores, + backend::fs::FsStore, }; +use utils::config::{Config, cron::SimpleCron, utils::ParseValue}; #[cfg(feature = "enterprise")] enum CompositeStore { @@ -208,20 +205,22 @@ impl Stores { "redis" => { if let Some(db) = crate::backend::redis::RedisStore::open(config, prefix) .await - .map(Arc::new) + .map(std::sync::Arc::new) { self.in_memory_stores .insert(store_id.clone(), InMemoryStore::Redis(db.clone())); - self.pubsub_stores.insert(store_id, PubSubStore::Redis(db)); + self.pubsub_stores + .insert(store_id, crate::PubSubStore::Redis(db)); } } #[cfg(feature = "nats")] "nats" => { if let Some(db) = crate::backend::nats::NatsStore::open(config, prefix) .await - .map(Arc::new) + .map(std::sync::Arc::new) { - self.pubsub_stores.insert(store_id, PubSubStore::Nats(db)); + self.pubsub_stores + .insert(store_id, crate::PubSubStore::Nats(db)); } } #[cfg(feature = "enterprise")] diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index a5cabd8b..2b73df14 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -140,6 +140,10 @@ impl HttpLimitResponse for Response { pub struct Semver(u64); impl Semver { + pub fn current() -> Self { + env!("CARGO_PKG_VERSION").try_into().unwrap() + } + pub fn new(major: u16, minor: u16, patch: u16) -> Self { let mut version: u64 = 0; version |= (major as u64) << 32; diff --git a/tests/Cargo.toml b/tests/Cargo.toml index ae7a0092..0c35620d 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -39,6 +39,7 @@ smtp = { path = "../crates/smtp", features = ["test_mode", "enterprise"] } common = { path = "../crates/common", features = ["test_mode", "enterprise"] } email = { path = "../crates/email", features = ["test_mode", "enterprise"] } spam-filter = { path = "../crates/spam-filter", features = ["test_mode", "enterprise"] } +migration = { path = "../crates/migration", features = ["test_mode", "enterprise"] } trc = { path = "../crates/trc" } managesieve = { path = "../crates/managesieve", features = ["test_mode", "enterprise"] } smtp-proto = { version = "0.1" }