diff --git a/Cargo.lock b/Cargo.lock index b036baaa..d15aea81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -895,6 +895,21 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "calcard" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "720e412adf25f179f643b0753108cb308b812f82e1d34131c06b015c806e3f3c" +dependencies = [ + "ahash", + "chrono", + "chrono-tz", + "hashify", + "mail-builder", + "mail-parser", + "rkyv", +] + [[package]] name = "calcard" version = "0.3.1" @@ -1165,7 +1180,7 @@ dependencies = [ "base64 0.22.1", "bincode 2.0.1", "biscuit", - "calcard", + "calcard 0.3.1", "chrono", "compact_str", "decancer", @@ -1700,7 +1715,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" name = "dav" version = "0.14.0" dependencies = [ - "calcard", + "calcard 0.3.1", "chrono", "common", "compact_str", @@ -1722,7 +1737,7 @@ dependencies = [ name = "dav-proto" version = "0.14.0" dependencies = [ - "calcard", + "calcard 0.3.1", "chrono", "compact_str", "hashify", @@ -2776,7 +2791,7 @@ name = "groupware" version = "0.14.0" dependencies = [ "ahash", - "calcard", + "calcard 0.3.1", "chrono", "common", "compact_str", @@ -3790,7 +3805,7 @@ dependencies = [ "aes-gcm-siv", "async-stream", "base64 0.22.1", - "calcard", + "calcard 0.3.1", "chrono", "common", "compact_str", @@ -3897,7 +3912,7 @@ name = "jmap_proto" version = "0.14.0" dependencies = [ "ahash", - "calcard", + "calcard 0.3.1", "compact_str", "hashify", "jmap-tools", @@ -4459,7 +4474,8 @@ version = "0.14.0" dependencies = [ "base64 0.22.1", "bincode 1.3.3", - "calcard", + "calcard 0.1.3", + "calcard 0.3.1", "common", "compact_str", "dav-proto", @@ -4470,6 +4486,7 @@ dependencies = [ "mail-auth", "mail-parser", "nlp", + "proc_macros", "rkyv", "serde", "serde_json", @@ -7281,7 +7298,7 @@ dependencies = [ "aes-gcm", "aes-gcm-siv", "base64 0.22.1", - "calcard", + "calcard 0.3.1", "chrono", "common", "compact_str", @@ -7948,7 +7965,7 @@ dependencies = [ "base64 0.22.1", "biscuit", "bytes", - "calcard", + "calcard 0.3.1", "chrono", "common", "compact_str", diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 3f5ff347..b96925aa 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -87,10 +87,11 @@ Schema history: 1 - v0.12.0 2 - v0.12.4 3 - v0.13.0 +4 - v0.14.0 */ -pub const DATABASE_SCHEMA_VERSION: u32 = 3; +pub const DATABASE_SCHEMA_VERSION: u32 = 4; pub const LONG_1D_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24); pub const LONG_1Y_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); diff --git a/crates/migration/Cargo.toml b/crates/migration/Cargo.toml index e8785e9e..83351b99 100644 --- a/crates/migration/Cargo.toml +++ b/crates/migration/Cargo.toml @@ -16,10 +16,12 @@ directory = { path = "../directory" } smtp = { path = "../smtp" } groupware = { path = "../groupware" } dav-proto = { path = "../dav-proto" } +proc_macros = { path = "../utils/proc-macros" } mail-parser = { version = "0.11", features = ["full_encoding"] } mail-auth = { version = "0.7.1", features = ["rkyv"] } sieve-rs = { version = "0.7", features = ["rkyv"] } -calcard = { version = "0.3", features = ["rkyv"] } +calcard_latest = { package = "calcard", version = "0.3", features = ["rkyv"] } +calcard_v01 = { package = "calcard", version = "0.1", features = ["rkyv"] } tokio = { version = "1.47", features = ["net", "macros"] } serde = { version = "1.0", features = ["derive"]} serde_json = "1.0" diff --git a/crates/migration/src/addressbook_v2.rs b/crates/migration/src/addressbook_v2.rs new file mode 100644 index 00000000..d5b34e4a --- /dev/null +++ b/crates/migration/src/addressbook_v2.rs @@ -0,0 +1,101 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use groupware::contact::{AddressBook, AddressBookPreferences}; +use store::{ + Serialize, + write::{Archiver, BatchBuilder, serialize::rkyv_deserialize}, +}; +use trc::AddContext; +use types::{acl::AclGrant, collection::Collection, dead_property::DeadProperty, field::Field}; + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +#[rkyv(derive(Debug))] +pub struct AddressBookV2 { + pub name: String, + pub display_name: Option, + pub description: Option, + pub sort_order: u32, + pub is_default: bool, + pub subscribers: Vec, + pub dead_properties: DeadProperty, + pub acls: Vec, + pub created: i64, + pub modified: i64, +} + +pub(crate) async fn migrate_addressbook_v013(server: &Server, account_id: u32) -> trc::Result { + let document_ids = server + .get_document_ids(account_id, Collection::AddressBook) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + if document_ids.is_empty() { + return Ok(0); + } + let mut num_migrated = 0; + + for document_id in document_ids.iter() { + let Some(archive) = server + .get_archive(account_id, Collection::AddressBook, document_id) + .await + .caused_by(trc::location!())? + else { + continue; + }; + + match archive.unarchive_untrusted::() { + Ok(book) => { + let book = rkyv_deserialize::<_, AddressBookV2>(book).unwrap(); + let new_book = AddressBook { + name: book.name, + preferences: vec![AddressBookPreferences { + account_id, + name: book + .display_name + .unwrap_or_else(|| "Address Book".to_string()), + description: book.description, + sort_order: book.sort_order, + }], + subscribers: book.subscribers, + dead_properties: book.dead_properties, + acls: book.acls, + created: book.created, + modified: book.modified, + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::AddressBook) + .update_document(document_id) + .set( + Field::ARCHIVE, + Archiver::new(new_book) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + num_migrated += 1; + } + Err(err) => { + if let Err(err_) = archive.unarchive_untrusted::() { + trc::error!(err_.caused_by(trc::location!())); + return Err(err.caused_by(trc::location!())); + } + } + } + } + + Ok(num_migrated) +} diff --git a/crates/migration/src/calendar_v2.rs b/crates/migration/src/calendar_v2.rs new file mode 100644 index 00000000..cf291fdf --- /dev/null +++ b/crates/migration/src/calendar_v2.rs @@ -0,0 +1,144 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use groupware::calendar::{Calendar, CalendarPreferences, Timezone}; +use store::{ + Serialize, + write::{Archiver, BatchBuilder, serialize::rkyv_deserialize}, +}; +use trc::AddContext; +use types::{acl::AclGrant, collection::Collection, dead_property::DeadProperty, field::Field}; + +use crate::event_v2::migrate_icalendar_v02; + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct CalendarV2 { + pub name: String, + pub preferences: Vec, + pub default_alerts: Vec, + pub acls: Vec, + pub dead_properties: DeadProperty, + pub created: i64, + pub modified: i64, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct CalendarPreferencesV2 { + pub account_id: u32, + pub name: String, + pub description: Option, + pub sort_order: u32, + pub color: Option, + pub flags: u16, + pub time_zone: TimezoneV2, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub enum TimezoneV2 { + IANA(u16), + Custom(calcard_v01::icalendar::ICalendar), + #[default] + Default, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct DefaultAlertV2 { + pub account_id: u32, + pub id: String, + pub alert: calcard_v01::icalendar::ICalendar, + pub with_time: bool, +} + +pub(crate) async fn migrate_calendar_v013(server: &Server, account_id: u32) -> trc::Result { + let document_ids = server + .get_document_ids(account_id, Collection::Calendar) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + if document_ids.is_empty() { + return Ok(0); + } + let mut num_migrated = 0; + + for document_id in document_ids.iter() { + let Some(archive) = server + .get_archive(account_id, Collection::Calendar, document_id) + .await + .caused_by(trc::location!())? + else { + continue; + }; + + match archive.unarchive_untrusted::() { + Ok(calendar) => { + let calendar = rkyv_deserialize::<_, CalendarV2>(calendar).unwrap(); + let new_calendar = Calendar { + name: calendar.name, + preferences: calendar + .preferences + .into_iter() + .map(|pref| CalendarPreferences { + account_id: pref.account_id, + name: pref.name, + description: pref.description, + sort_order: pref.sort_order, + color: pref.color, + flags: 0, + time_zone: match pref.time_zone { + TimezoneV2::IANA(tzid) => Timezone::IANA(tzid), + TimezoneV2::Custom(tz) => { + Timezone::Custom(migrate_icalendar_v02(tz)) + } + TimezoneV2::Default => Timezone::Default, + }, + default_alerts: Vec::new(), + }) + .collect(), + acls: calendar.acls, + supported_components: 0, + dead_properties: calendar.dead_properties, + created: calendar.created, + modified: calendar.modified, + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::Calendar) + .update_document(document_id) + .set( + Field::ARCHIVE, + Archiver::new(new_calendar) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + num_migrated += 1; + } + Err(err) => { + if let Err(err_) = archive.unarchive_untrusted::() { + trc::error!(err_.caused_by(trc::location!())); + return Err(err.caused_by(trc::location!())); + } + } + } + } + + Ok(num_migrated) +} diff --git a/crates/migration/src/contact_v2.rs b/crates/migration/src/contact_v2.rs new file mode 100644 index 00000000..39ed80e1 --- /dev/null +++ b/crates/migration/src/contact_v2.rs @@ -0,0 +1,89 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::{DavName, Server}; +use groupware::contact::ContactCard; +use store::{ + Serialize, + write::{Archiver, BatchBuilder, serialize::rkyv_deserialize}, +}; +use trc::AddContext; +use types::{collection::Collection, dead_property::DeadProperty, field::Field}; + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct ContactCardV2 { + pub names: Vec, + pub display_name: Option, + pub card: calcard_v01::vcard::VCard, + pub dead_properties: DeadProperty, + pub created: i64, + pub modified: i64, + pub size: u32, +} + +pub(crate) async fn migrate_contacts_v013(server: &Server, account_id: u32) -> trc::Result { + let document_ids = server + .get_document_ids(account_id, Collection::ContactCard) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + let mut num_migrated = 0; + + for document_id in document_ids.iter() { + let Some(archive) = server + .get_archive(account_id, Collection::ContactCard, document_id) + .await + .caused_by(trc::location!())? + else { + continue; + }; + + match archive.unarchive_untrusted::() { + Ok(contact) => { + let contact = rkyv_deserialize::<_, ContactCardV2>(contact).unwrap(); + let new_contact = ContactCard { + names: contact.names, + display_name: contact.display_name, + dead_properties: contact.dead_properties, + size: contact.size, + created: contact.created, + modified: contact.modified, + card: calcard_latest::vcard::VCard::parse(contact.card.to_string()) + .unwrap_or_default(), + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::ContactCard) + .update_document(document_id) + .set( + Field::ARCHIVE, + Archiver::new(new_contact) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + num_migrated += 1; + } + Err(err) => { + if let Err(err_) = archive.unarchive_untrusted::() { + trc::error!(err_.caused_by(trc::location!())); + return Err(err.caused_by(trc::location!())); + } + } + } + } + + Ok(num_migrated) +} diff --git a/crates/migration/src/calendar.rs b/crates/migration/src/event_v1.rs similarity index 93% rename from crates/migration/src/calendar.rs rename to crates/migration/src/event_v1.rs index 2fd61e52..302f8c0b 100644 --- a/crates/migration/src/calendar.rs +++ b/crates/migration/src/event_v1.rs @@ -4,7 +4,6 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use calcard::{common::timezone::Tz, icalendar::ICalendar}; use common::{DavName, Server}; use groupware::calendar::{AlarmDelta, CalendarEvent, CalendarEventData, ComponentTimeRange}; use store::{ @@ -15,6 +14,8 @@ use store::{ use trc::AddContext; use types::{collection::Collection, dead_property::DeadProperty, field::Field}; +use crate::event_v2::migrate_icalendar_v02; + #[derive( rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, )] @@ -35,14 +36,14 @@ pub struct CalendarEventV1 { )] pub struct UserProperties { pub account_id: u32, - pub properties: ICalendar, + pub properties: calcard_v01::icalendar::ICalendar, } #[derive( rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, )] pub struct CalendarEventDataV1 { - pub event: ICalendar, + pub event: calcard_v01::icalendar::ICalendar, pub time_ranges: Box<[ComponentTimeRange]>, pub alarms: Box<[AlarmV1]>, pub base_offset: i64, @@ -59,7 +60,7 @@ pub struct AlarmV1 { pub alarms: Box<[AlarmDelta]>, } -pub(crate) async fn migrate_calendar_events(server: &Server) -> trc::Result<()> { +pub(crate) async fn migrate_calendar_events_v012(server: &Server) -> trc::Result<()> { // Obtain email ids let account_ids = server .get_document_ids(u32::MAX, Collection::Principal) @@ -103,8 +104,8 @@ pub(crate) async fn migrate_calendar_events(server: &Server) -> trc::Result<()> names: event.names, display_name: event.display_name, data: CalendarEventData::new( - event.data.event, - Tz::Floating, + migrate_icalendar_v02(event.data.event), + calcard_latest::common::timezone::Tz::Floating, server.core.groupware.max_ical_instances, &mut next_email_alarm, ), diff --git a/crates/migration/src/event_v2.rs b/crates/migration/src/event_v2.rs new file mode 100644 index 00000000..e5e78cdc --- /dev/null +++ b/crates/migration/src/event_v2.rs @@ -0,0 +1,212 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::{DavName, Server}; +use groupware::calendar::{ + Alarm, CalendarEvent, CalendarEventData, CalendarEventNotification, ComponentTimeRange, +}; +use store::{ + Serialize, + write::{Archiver, BatchBuilder, serialize::rkyv_deserialize}, +}; +use trc::AddContext; +use types::{collection::Collection, dead_property::DeadProperty, field::Field}; + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct CalendarEventV2 { + pub names: Vec, + pub display_name: Option, + pub data: CalendarEventDataV2, + pub user_properties: Vec, + pub flags: u16, + pub dead_properties: DeadProperty, + pub size: u32, + pub created: i64, + pub modified: i64, + pub schedule_tag: Option, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct UserPropertiesV2 { + pub account_id: u32, + pub properties: calcard_v01::icalendar::ICalendar, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct CalendarEventDataV2 { + pub event: calcard_v01::icalendar::ICalendar, + pub time_ranges: Box<[ComponentTimeRange]>, + pub alarms: Box<[Alarm]>, + pub base_offset: i64, + pub base_time_utc: u32, + pub duration: u32, +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +pub struct CalendarEventNotificationV2 { + pub itip: calcard_v01::icalendar::ICalendar, + pub event_id: Option, + pub flags: u16, + pub size: u32, + pub created: i64, + pub modified: i64, +} + +pub(crate) async fn migrate_calendar_events_v013( + server: &Server, + account_id: u32, +) -> trc::Result { + let document_ids = server + .get_document_ids(account_id, Collection::CalendarEvent) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + let mut num_migrated = 0; + + for document_id in document_ids.iter() { + let Some(archive) = server + .get_archive(account_id, Collection::CalendarEvent, document_id) + .await + .caused_by(trc::location!())? + else { + continue; + }; + + match archive.unarchive_untrusted::() { + Ok(event) => { + let event = rkyv_deserialize::<_, CalendarEventV2>(event).unwrap(); + let new_event = CalendarEvent { + names: event.names, + display_name: event.display_name, + data: CalendarEventData { + event: migrate_icalendar_v02(event.data.event), + time_ranges: event.data.time_ranges, + alarms: event.data.alarms, + base_offset: event.data.base_offset, + base_time_utc: event.data.base_time_utc, + duration: event.data.duration, + }, + preferences: Default::default(), + flags: event.flags, + dead_properties: event.dead_properties, + size: event.size, + created: event.created, + modified: event.modified, + schedule_tag: None, + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::CalendarEvent) + .update_document(document_id) + .set( + Field::ARCHIVE, + Archiver::new(new_event) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + num_migrated += 1; + } + Err(err) => { + if let Err(err_) = archive.unarchive_untrusted::() { + trc::error!(err_.caused_by(trc::location!())); + return Err(err.caused_by(trc::location!())); + } + } + } + } + + Ok(num_migrated) +} + +pub(crate) async fn migrate_calendar_scheduling_v013( + server: &Server, + account_id: u32, +) -> trc::Result { + let document_ids = server + .get_document_ids(account_id, Collection::CalendarEventNotification) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + let mut num_migrated = 0; + + for document_id in document_ids.iter() { + let Some(archive) = server + .get_archive( + account_id, + Collection::CalendarEventNotification, + document_id, + ) + .await + .caused_by(trc::location!())? + else { + continue; + }; + + match archive.unarchive_untrusted::() { + Ok(event) => { + let event = rkyv_deserialize::<_, CalendarEventNotificationV2>(event).unwrap(); + let new_event = CalendarEventNotification { + event: migrate_icalendar_v02(event.itip), + event_id: event.event_id, + changed_by: Default::default(), + flags: 0, + size: event.size, + created: event.created, + modified: event.modified, + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::CalendarEventNotification) + .update_document(document_id) + .set( + Field::ARCHIVE, + Archiver::new(new_event) + .serialize() + .caused_by(trc::location!())?, + ); + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + num_migrated += 1; + } + Err(err) => { + if let Err(err_) = archive.unarchive_untrusted::() { + trc::error!(err_.caused_by(trc::location!())); + return Err(err.caused_by(trc::location!())); + } + } + } + } + + Ok(num_migrated) +} + +pub(crate) fn migrate_icalendar_v02( + ical: calcard_v01::icalendar::ICalendar, +) -> calcard_latest::icalendar::ICalendar { + calcard_latest::icalendar::ICalendar::parse(ical.to_string()).unwrap_or_default() +} diff --git a/crates/migration/src/lib.rs b/crates/migration/src/lib.rs index 8d86360f..525904bd 100644 --- a/crates/migration/src/lib.rs +++ b/crates/migration/src/lib.rs @@ -5,71 +5,58 @@ */ use crate::{ - calendar::migrate_calendar_events, queue::{migrate_queue_v011, migrate_queue_v012}, - tasks::migrate_tasks_v011, + v011::migrate_v0_11, + v012::migrate_v0_12, + v013::migrate_v0_13, }; -use changelog::reset_changelog; -use common::{ - DATABASE_SCHEMA_VERSION, KV_LOCK_HOUSEKEEPER, Server, manager::boot::DEFAULT_SETTINGS, -}; -use principal::{migrate_principal, migrate_principals}; -use report::migrate_reports; +use common::{DATABASE_SCHEMA_VERSION, Server, manager::boot::DEFAULT_SETTINGS}; use std::time::Duration; use store::{ Deserialize, IterateParams, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SerializeInfallible, U32_LEN, Value, ValueKey, - dispatch::{DocumentSet, lookup::KeyValue}, - rand::{self, seq::SliceRandom}, + dispatch::DocumentSet, write::{AnyClass, AnyKey, BatchBuilder, ValueClass, key::DeserializeBigEndian}, }; use trc::AddContext; use types::collection::Collection; -pub mod calendar; +pub mod addressbook_v2; +pub mod calendar_v2; pub mod changelog; +pub mod contact_v2; pub mod email; pub mod encryption; +pub mod event_v1; +pub mod event_v2; pub mod identity; pub mod mailbox; pub mod object; -pub mod principal; -pub mod push; +pub mod principal_v1; +pub mod principal_v2; +pub mod push_v1; +pub mod push_v2; pub mod queue; pub mod report; -pub mod sieve; +pub mod sieve_v1; +pub mod sieve_v2; pub mod submission; pub mod tasks; pub mod threads; +pub mod v011; +pub mod v012; +pub mod v013; const LOCK_WAIT_TIME_ACCOUNT: u64 = 3 * 60; const LOCK_WAIT_TIME_CORE: u64 = 5 * 60; const LOCK_RETRY_TIME: Duration = Duration::from_secs(30); -/* - -pub struct AddressBook { - pub name: String, - pub display_name: Option, - pub description: Option, - pub sort_order: u32, - pub is_default: bool, - pub subscribers: Vec, - pub dead_properties: DeadProperty, - pub acls: Vec, - pub created: i64, - pub modified: i64, -} - - -*/ - pub async fn try_migrate(server: &Server) -> trc::Result<()> { if let Some(version) = std::env::var("FORCE_MIGRATE_QUEUE") .ok() .and_then(|s| s.parse::().ok()) { - if version == 12 { + if version == 12 || version <= 2 { migrate_queue_v012(server) .await .caused_by(trc::location!())?; @@ -79,15 +66,8 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> { .caused_by(trc::location!())?; } return Ok(()); - } else if let Some(account_id) = std::env::var("FORCE_MIGRATE_ACCOUNT") - .ok() - .and_then(|s| s.parse().ok()) - { - migrate_principal(server, account_id) - .await - .caused_by(trc::location!())?; - return Ok(()); - } else if let Some(version) = std::env::var("FORCE_MIGRATE") + } + if let Some(version) = std::env::var("FORCE_MIGRATE") .ok() .and_then(|s| s.parse::().ok()) { @@ -96,11 +76,16 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> { migrate_v0_12(server, true) .await .caused_by(trc::location!())?; + migrate_v0_13(server).await.caused_by(trc::location!())?; } 2 => { migrate_v0_12(server, false) .await .caused_by(trc::location!())?; + migrate_v0_13(server).await.caused_by(trc::location!())?; + } + 3 => { + migrate_v0_13(server).await.caused_by(trc::location!())?; } _ => { panic!("Unknown migration version: {version}"); @@ -125,14 +110,20 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> { migrate_v0_12(server, true) .await .caused_by(trc::location!())?; + migrate_v0_13(server).await.caused_by(trc::location!())?; true } Some(2) => { migrate_v0_12(server, false) .await .caused_by(trc::location!())?; + migrate_v0_13(server).await.caused_by(trc::location!())?; true } + Some(3) => { + migrate_v0_13(server).await.caused_by(trc::location!())?; + false + } Some(version) => { panic!( "Unknown database schema version, expected {} or below, found {}", @@ -184,211 +175,6 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> { Ok(()) } -async fn migrate_v0_12(server: &Server, migrate_tasks: bool) -> trc::Result<()> { - let force_lock = std::env::var("FORCE_LOCK").is_ok(); - let in_memory = server.in_memory_store(); - - loop { - if force_lock - || in_memory - .try_lock( - KV_LOCK_HOUSEKEEPER, - b"migrate_core_lock", - LOCK_WAIT_TIME_CORE, - ) - .await - .caused_by(trc::location!())? - { - migrate_queue_v012(server) - .await - .caused_by(trc::location!())?; - - if migrate_tasks { - migrate_tasks_v011(server) - .await - .caused_by(trc::location!())?; - } - - in_memory - .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") - .await - .caused_by(trc::location!())?; - break; - } else { - trc::event!( - Server(trc::ServerEvent::Startup), - Details = format!("Migration lock busy, waiting 30 seconds.",) - ); - - tokio::time::sleep(LOCK_RETRY_TIME).await; - } - } - - if migrate_tasks { - migrate_calendar_events(server) - .await - .caused_by(trc::location!()) - } else { - Ok(()) - } -} - -async fn migrate_v0_11(server: &Server) -> trc::Result<()> { - let force_lock = std::env::var("FORCE_LOCK").is_ok(); - let in_memory = server.in_memory_store(); - let principal_ids; - - loop { - if force_lock - || in_memory - .try_lock( - KV_LOCK_HOUSEKEEPER, - b"migrate_core_lock", - LOCK_WAIT_TIME_CORE, - ) - .await - .caused_by(trc::location!())? - { - if in_memory - .key_get::<()>(KeyValue::<()>::build_key( - KV_LOCK_HOUSEKEEPER, - b"migrate_core_done", - )) - .await - .caused_by(trc::location!())? - .is_none() - { - migrate_queue_v011(server) - .await - .caused_by(trc::location!())?; - migrate_reports(server).await.caused_by(trc::location!())?; - reset_changelog(server).await.caused_by(trc::location!())?; - principal_ids = migrate_principals(server) - .await - .caused_by(trc::location!())?; - - in_memory - .key_set( - KeyValue::new( - KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"), - b"1".to_vec(), - ) - .expires(86400), - ) - .await - .caused_by(trc::location!())?; - } else { - principal_ids = server - .get_document_ids(u32::MAX, Collection::Principal) - .await - .caused_by(trc::location!())? - .unwrap_or_default(); - - trc::event!( - Server(trc::ServerEvent::Startup), - Details = format!("Migration completed by another node.",) - ); - } - - in_memory - .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") - .await - .caused_by(trc::location!())?; - break; - } else { - trc::event!( - Server(trc::ServerEvent::Startup), - Details = format!("Migration lock busy, waiting 30 seconds.",) - ); - - tokio::time::sleep(LOCK_RETRY_TIME).await; - } - } - - if !principal_ids.is_empty() { - let mut principal_ids = principal_ids.into_iter().collect::>(); - principal_ids.shuffle(&mut rand::rng()); - - loop { - let mut skipped_principal_ids = Vec::new(); - let mut num_migrated = 0; - - for principal_id in principal_ids { - let lock_key = format!("migrate_{principal_id}_lock"); - let done_key = format!("migrate_{principal_id}_done"); - - if force_lock - || in_memory - .try_lock( - KV_LOCK_HOUSEKEEPER, - lock_key.as_bytes(), - LOCK_WAIT_TIME_ACCOUNT, - ) - .await - .caused_by(trc::location!())? - { - if in_memory - .key_get::<()>(KeyValue::<()>::build_key( - KV_LOCK_HOUSEKEEPER, - done_key.as_bytes(), - )) - .await - .caused_by(trc::location!())? - .is_none() - { - migrate_principal(server, principal_id) - .await - .caused_by(trc::location!())?; - - num_migrated += 1; - - in_memory - .key_set( - KeyValue::new( - KeyValue::<()>::build_key( - KV_LOCK_HOUSEKEEPER, - done_key.as_bytes(), - ), - b"1".to_vec(), - ) - .expires(86400), - ) - .await - .caused_by(trc::location!())?; - } - - in_memory - .remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes()) - .await - .caused_by(trc::location!())?; - } else { - skipped_principal_ids.push(principal_id); - } - } - - if !skipped_principal_ids.is_empty() { - trc::event!( - Server(trc::ServerEvent::Startup), - Details = format!( - "Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.", - skipped_principal_ids.len() - ) - ); - tokio::time::sleep(LOCK_RETRY_TIME).await; - principal_ids = skipped_principal_ids; - } else { - trc::event!( - Server(trc::ServerEvent::Startup), - Details = format!("Account migration completed.",) - ); - break; - } - } - } - - Ok(()) -} - async fn is_new_install(server: &Server) -> trc::Result { for subspace in [ SUBSPACE_QUEUE_MESSAGE, diff --git a/crates/migration/src/principal.rs b/crates/migration/src/principal_v1.rs similarity index 95% rename from crates/migration/src/principal.rs rename to crates/migration/src/principal_v1.rs index 91e302ea..510ab795 100644 --- a/crates/migration/src/principal.rs +++ b/crates/migration/src/principal_v1.rs @@ -4,6 +4,11 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use crate::{ + email::migrate_emails, encryption::migrate_encryption_params, identity::migrate_identities, + mailbox::migrate_mailboxes, push_v1::migrate_push_subscriptions_v011, + sieve_v1::migrate_sieve_v011, submission::migrate_email_submissions, threads::migrate_threads, +}; use common::Server; use directory::{ Permission, Principal, PrincipalData, ROLE_ADMIN, ROLE_USER, Type, @@ -22,13 +27,7 @@ use trc::AddContext; use types::collection::Collection; use utils::codec::leb128::Leb128Iterator; -use crate::{ - email::migrate_emails, encryption::migrate_encryption_params, identity::migrate_identities, - mailbox::migrate_mailboxes, push::migrate_push_subscriptions, sieve::migrate_sieve, - submission::migrate_email_submissions, threads::migrate_threads, -}; - -pub(crate) async fn migrate_principals(server: &Server) -> trc::Result { +pub(crate) async fn migrate_principals_v0_11(server: &Server) -> trc::Result { // Obtain email ids let principal_ids = server .get_document_ids(u32::MAX, Collection::Principal) @@ -53,7 +52,8 @@ pub(crate) async fn migrate_principals(server: &Server) -> trc::Result { - let principal = Principal::from_legacy(legacy); + let mut principal = Principal::from_legacy(legacy); + principal.sort(); let mut batch = BatchBuilder::new(); batch .with_account_id(u32::MAX) @@ -120,7 +120,7 @@ pub(crate) async fn migrate_principals(server: &Server) -> trc::Result trc::Result<()> { +pub(crate) async fn migrate_principal_v0_11(server: &Server, account_id: u32) -> trc::Result<()> { let start_time = Instant::now(); let num_emails = migrate_emails(server, account_id) .await @@ -131,10 +131,10 @@ pub(crate) async fn migrate_principal(server: &Server, account_id: u32) -> trc:: let num_params = migrate_encryption_params(server, account_id) .await .caused_by(trc::location!())?; - let num_subscriptions = migrate_push_subscriptions(server, account_id) + let num_subscriptions = migrate_push_subscriptions_v011(server, account_id) .await .caused_by(trc::location!())?; - let num_sieve = migrate_sieve(server, account_id) + let num_sieve = migrate_sieve_v011(server, account_id) .await .caused_by(trc::location!())?; let num_submissions = migrate_email_submissions(server, account_id) diff --git a/crates/migration/src/principal_v2.rs b/crates/migration/src/principal_v2.rs new file mode 100644 index 00000000..e702ed16 --- /dev/null +++ b/crates/migration/src/principal_v2.rs @@ -0,0 +1,539 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::time::Instant; + +use common::Server; +use directory::{Principal, PrincipalData, Type}; +use proc_macros::EnumMethods; +use store::{ + Serialize, ValueKey, + roaring::RoaringBitmap, + write::{AlignedBytes, Archive, Archiver, BatchBuilder, DirectoryClass, ValueClass}, +}; +use trc::AddContext; +use types::collection::Collection; + +use crate::{ + addressbook_v2::migrate_addressbook_v013, + calendar_v2::migrate_calendar_v013, + contact_v2::migrate_contacts_v013, + event_v2::{migrate_calendar_events_v013, migrate_calendar_scheduling_v013}, + push_v2::migrate_push_subscriptions_v013, + sieve_v2::migrate_sieve_v013, +}; + +pub(crate) async fn migrate_principals_v0_13(server: &Server) -> trc::Result { + // Obtain email ids + let principal_ids = server + .get_document_ids(u32::MAX, Collection::Principal) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_principals = principal_ids.len(); + if num_principals == 0 { + return Ok(principal_ids); + } + let mut num_migrated = 0; + + for principal_id in principal_ids.iter() { + match server + .store() + .get_value::>(ValueKey { + account_id: u32::MAX, + collection: Collection::Principal.into(), + document_id: principal_id, + class: ValueClass::Directory(DirectoryClass::Principal(principal_id)), + }) + .await + { + Ok(Some(legacy)) => match legacy.deserialize_untrusted::() { + Ok(old_principal) => { + let mut principal = Principal { + id: principal_id, + typ: old_principal.typ, + name: old_principal.name, + data: Vec::new(), + }; + + for secret in old_principal.secrets { + principal.data.push(PrincipalData::Secret(secret)); + } + + for (idx, email) in old_principal.emails.into_iter().enumerate() { + if idx == 0 { + principal.data.push(PrincipalData::PrimaryEmail(email)); + } else { + principal.data.push(PrincipalData::EmailAlias(email)); + } + } + + if let Some(description) = old_principal.description { + principal.data.push(PrincipalData::Description(description)); + } + + if let Some(quota) = old_principal.quota + && quota > 0 + { + principal.data.push(PrincipalData::DiskQuota(quota)); + } + + if let Some(tenant) = old_principal.tenant { + principal.data.push(PrincipalData::Tenant(tenant)); + } + + for item in old_principal.data { + match item { + PrincipalDataV2::MemberOf(items) => { + for item in items { + principal.data.push(PrincipalData::MemberOf(item)); + } + } + PrincipalDataV2::Roles(items) => { + for item in items { + principal.data.push(PrincipalData::Role(item)); + } + } + PrincipalDataV2::Lists(items) => { + for item in items { + principal.data.push(PrincipalData::List(item)); + } + } + PrincipalDataV2::Permissions(items) => { + for item in items { + principal.data.push(PrincipalData::Permission { + permission_id: item.permission.id(), + grant: item.grant, + }); + } + } + PrincipalDataV2::Picture(item) => { + principal.data.push(PrincipalData::Picture(item)); + } + PrincipalDataV2::ExternalMembers(items) => { + for item in items { + principal.data.push(PrincipalData::ExternalMember(item)); + } + } + PrincipalDataV2::Urls(items) => { + for item in items { + principal.data.push(PrincipalData::Url(item)); + } + } + PrincipalDataV2::PrincipalQuota(items) => { + for item in items { + principal.data.push(PrincipalData::DirectoryQuota { + quota: item.quota as u32, + typ: item.typ, + }); + } + } + PrincipalDataV2::Locale(item) => { + principal.data.push(PrincipalData::Locale(item)); + } + } + } + + principal.sort(); + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(u32::MAX) + .with_collection(Collection::Principal) + .update_document(principal_id); + + batch.set( + ValueClass::Directory(DirectoryClass::Principal(principal_id)), + Archiver::new(principal) + .serialize() + .caused_by(trc::location!())?, + ); + num_migrated += 1; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Err(_) => { + if let Err(err) = legacy.deserialize_untrusted::() { + return Err(err.account_id(principal_id).caused_by(trc::location!())); + } + } + }, + Ok(None) => (), + Err(err) => { + return Err(err.account_id(principal_id).caused_by(trc::location!())); + } + } + } + + if num_migrated > 0 { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migrated {num_migrated} principals",) + ); + } + + Ok(principal_ids) +} + +pub(crate) async fn migrate_principal_v0_13(server: &Server, account_id: u32) -> trc::Result<()> { + let start_time = Instant::now(); + let num_push = migrate_push_subscriptions_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_sieve = migrate_sieve_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_calendars = migrate_calendar_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_events = migrate_calendar_events_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_event_scheduling = migrate_calendar_scheduling_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_books = migrate_addressbook_v013(server, account_id) + .await + .caused_by(trc::location!())?; + let num_contacts = migrate_contacts_v013(server, account_id) + .await + .caused_by(trc::location!())?; + + if num_sieve > 0 + || num_books > 0 + || num_contacts > 0 + || num_calendars > 0 + || num_events > 0 + || num_push > 0 + || num_event_scheduling > 0 + { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!( + "Migrated accountId {account_id} with {num_sieve} sieve scripts, {num_push} push subscriptions, {num_calendars} calendars, {num_events} calendar events, {num_event_scheduling} event scheduling, {num_books} address books and {num_contacts} contacts" + ), + Elapsed = start_time.elapsed() + ); + } + + Ok(()) +} + +#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)] +pub struct PrincipalV2 { + pub id: u32, + pub typ: Type, + pub name: String, + pub description: Option, + pub secrets: Vec, + pub emails: Vec, + pub quota: Option, + pub tenant: Option, + pub data: Vec, +} + +#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)] +pub enum PrincipalDataV2 { + MemberOf(Vec), + Roles(Vec), + Lists(Vec), + Permissions(Vec), + Picture(String), + ExternalMembers(Vec), + Urls(Vec), + PrincipalQuota(Vec), + Locale(String), +} + +#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)] +pub struct PrincipalQuotaV2 { + pub quota: u64, + pub typ: Type, +} + +#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)] +pub struct PermissionGrantV2 { + pub permission: PermissionV2, + pub grant: bool, +} + +#[derive( + rkyv::Archive, + rkyv::Deserialize, + rkyv::Serialize, + Debug, + Clone, + Copy, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + EnumMethods, +)] +#[serde(rename_all = "kebab-case")] +pub enum PermissionV2 { + // WARNING: add new ids at the end (TODO: use static ids) + + // Admin + Impersonate, + UnlimitedRequests, + UnlimitedUploads, + DeleteSystemFolders, + MessageQueueList, + MessageQueueGet, + MessageQueueUpdate, + MessageQueueDelete, + OutgoingReportList, + OutgoingReportGet, + OutgoingReportDelete, + IncomingReportList, + IncomingReportGet, + IncomingReportDelete, + SettingsList, + SettingsUpdate, + SettingsDelete, + SettingsReload, + IndividualList, + IndividualGet, + IndividualUpdate, + IndividualDelete, + IndividualCreate, + GroupList, + GroupGet, + GroupUpdate, + GroupDelete, + GroupCreate, + DomainList, + DomainGet, + DomainCreate, + DomainUpdate, + DomainDelete, + TenantList, + TenantGet, + TenantCreate, + TenantUpdate, + TenantDelete, + MailingListList, + MailingListGet, + MailingListCreate, + MailingListUpdate, + MailingListDelete, + RoleList, + RoleGet, + RoleCreate, + RoleUpdate, + RoleDelete, + PrincipalList, + PrincipalGet, + PrincipalCreate, + PrincipalUpdate, + PrincipalDelete, + BlobFetch, + PurgeBlobStore, + PurgeDataStore, + PurgeInMemoryStore, + PurgeAccount, + FtsReindex, + Undelete, + DkimSignatureCreate, + DkimSignatureGet, + SpamFilterUpdate, + WebadminUpdate, + LogsView, + SpamFilterTrain, + Restart, + TracingList, + TracingGet, + TracingLive, + MetricsList, + MetricsLive, + + // Generic + Authenticate, + AuthenticateOauth, + EmailSend, + EmailReceive, + + // Account Management + ManageEncryption, + ManagePasswords, + + // JMAP + JmapEmailGet, + JmapMailboxGet, + JmapThreadGet, + JmapIdentityGet, + JmapEmailSubmissionGet, + JmapPushSubscriptionGet, + JmapSieveScriptGet, + JmapVacationResponseGet, + JmapPrincipalGet, + JmapQuotaGet, + JmapBlobGet, + JmapEmailSet, + JmapMailboxSet, + JmapIdentitySet, + JmapEmailSubmissionSet, + JmapPushSubscriptionSet, + JmapSieveScriptSet, + JmapVacationResponseSet, + JmapEmailChanges, + JmapMailboxChanges, + JmapThreadChanges, + JmapIdentityChanges, + JmapEmailSubmissionChanges, + JmapQuotaChanges, + JmapEmailCopy, + JmapBlobCopy, + JmapEmailImport, + JmapEmailParse, + JmapEmailQueryChanges, + JmapMailboxQueryChanges, + JmapEmailSubmissionQueryChanges, + JmapSieveScriptQueryChanges, + JmapPrincipalQueryChanges, + JmapQuotaQueryChanges, + JmapEmailQuery, + JmapMailboxQuery, + JmapEmailSubmissionQuery, + JmapSieveScriptQuery, + JmapPrincipalQuery, + JmapQuotaQuery, + JmapSearchSnippet, + JmapSieveScriptValidate, + JmapBlobLookup, + JmapBlobUpload, + JmapEcho, + + // IMAP + ImapAuthenticate, + ImapAclGet, + ImapAclSet, + ImapMyRights, + ImapListRights, + ImapAppend, + ImapCapability, + ImapId, + ImapCopy, + ImapMove, + ImapCreate, + ImapDelete, + ImapEnable, + ImapExpunge, + ImapFetch, + ImapIdle, + ImapList, + ImapLsub, + ImapNamespace, + ImapRename, + ImapSearch, + ImapSort, + ImapSelect, + ImapExamine, + ImapStatus, + ImapStore, + ImapSubscribe, + ImapThread, + + // POP3 + Pop3Authenticate, + Pop3List, + Pop3Uidl, + Pop3Stat, + Pop3Retr, + Pop3Dele, + + // ManageSieve + SieveAuthenticate, + SieveListScripts, + SieveSetActive, + SieveGetScript, + SievePutScript, + SieveDeleteScript, + SieveRenameScript, + SieveCheckScript, + SieveHaveSpace, + + // API keys + ApiKeyList, + ApiKeyGet, + ApiKeyCreate, + ApiKeyUpdate, + ApiKeyDelete, + + // OAuth clients + OauthClientList, + OauthClientGet, + OauthClientCreate, + OauthClientUpdate, + OauthClientDelete, + + // OAuth client registration + OauthClientRegistration, + OauthClientOverride, + + AiModelInteract, + Troubleshoot, + SpamFilterClassify, + + // WebDAV permissions + DavSyncCollection, + DavExpandProperty, + + DavPrincipalAcl, + DavPrincipalList, + DavPrincipalMatch, + DavPrincipalSearch, + DavPrincipalSearchPropSet, + + DavFilePropFind, + DavFilePropPatch, + DavFileGet, + DavFileMkCol, + DavFileDelete, + DavFilePut, + DavFileCopy, + DavFileMove, + DavFileLock, + DavFileAcl, + + DavCardPropFind, + DavCardPropPatch, + DavCardGet, + DavCardMkCol, + DavCardDelete, + DavCardPut, + DavCardCopy, + DavCardMove, + DavCardLock, + DavCardAcl, + DavCardQuery, + DavCardMultiGet, + + DavCalPropFind, + DavCalPropPatch, + DavCalGet, + DavCalMkCol, + DavCalDelete, + DavCalPut, + DavCalCopy, + DavCalMove, + DavCalLock, + DavCalAcl, + DavCalQuery, + DavCalMultiGet, + DavCalFreeBusyQuery, + + CalendarAlarms, + CalendarSchedulingSend, + CalendarSchedulingReceive, + // WARNING: add new ids at the end (TODO: use static ids) +} diff --git a/crates/migration/src/push.rs b/crates/migration/src/push_v1.rs similarity index 64% rename from crates/migration/src/push.rs rename to crates/migration/src/push_v1.rs index ed5730b9..ae8baa95 100644 --- a/crates/migration/src/push.rs +++ b/crates/migration/src/push_v1.rs @@ -8,20 +8,23 @@ use super::object::Object; use crate::object::{FromLegacy, Property, Value}; use base64::{Engine, engine::general_purpose}; use common::Server; -use email::push::{Keys, PushSubscription}; +use email::push::{Keys, PushSubscription, PushSubscriptions}; use store::{ Serialize, ValueKey, - write::{AlignedBytes, Archive, Archiver, BatchBuilder, ValueClass}, + write::{Archiver, BatchBuilder, ValueClass}, }; use trc::AddContext; -use types::{collection::Collection, field::Field, type_state::DataType}; +use types::{ + collection::Collection, + field::{Field, PrincipalField}, + type_state::DataType, +}; -pub(crate) async fn migrate_push_subscriptions( +pub(crate) async fn migrate_push_subscriptions_v011( server: &Server, account_id: u32, ) -> trc::Result { // Obtain email ids - let todo = "fix"; let push_subscription_ids = server .get_document_ids(account_id, Collection::PushSubscription) .await @@ -31,7 +34,7 @@ pub(crate) async fn migrate_push_subscriptions( if num_push_subscriptions == 0 { return Ok(0); } - let mut did_migrate = false; + let mut subscriptions = Vec::with_capacity(num_push_subscriptions as usize); for push_subscription_id in &push_subscription_ids { match server @@ -45,62 +48,50 @@ pub(crate) async fn migrate_push_subscriptions( .await { Ok(Some(legacy)) => { - let mut batch = BatchBuilder::new(); - batch - .with_account_id(account_id) - .with_collection(Collection::PushSubscription) - .update_document(push_subscription_id) - .set( - Field::ARCHIVE, - Archiver::new(PushSubscription::from_legacy(legacy)) - .serialize() - .caused_by(trc::location!())?, - ); - did_migrate = true; - - server - .store() - .write(batch.build_all()) - .await - .caused_by(trc::location!())?; + let mut subscription = PushSubscription::from_legacy(legacy); + subscription.id = push_subscription_id; + subscriptions.push(subscription); } Ok(None) => (), Err(err) => { - if server - .store() - .get_value::>(ValueKey { - account_id, - collection: Collection::PushSubscription.into(), - document_id: push_subscription_id, - class: ValueClass::Property(Field::ARCHIVE.into()), - }) - .await - .is_err() - { - return Err(err - .account_id(account_id) - .document_id(push_subscription_id) - .caused_by(trc::location!())); - } + return Err(err + .account_id(account_id) + .document_id(push_subscription_id) + .caused_by(trc::location!())); } } } - // Increment document id counter - if did_migrate { + if !subscriptions.is_empty() { + // Save changes + let num_push_subscriptions = subscriptions.len() as u64; + let mut batch = BatchBuilder::new(); + + batch + .with_account_id(u32::MAX) + .with_collection(Collection::PushSubscription) + .create_document(account_id) + .with_account_id(account_id); + + for subscription in &subscriptions { + batch.delete_document(subscription.id).clear(Field::ARCHIVE); + } + + batch + .with_collection(Collection::Principal) + .update_document(0) + .set( + PrincipalField::PushSubscriptions, + Archiver::new(PushSubscriptions { subscriptions }) + .serialize() + .caused_by(trc::location!())?, + ); + server - .store() - .assign_document_ids( - account_id, - Collection::PushSubscription, - push_subscription_ids - .max() - .map(|id| id as u64) - .unwrap_or(num_push_subscriptions) - + 1, - ) + .commit_batch(batch) .await .caused_by(trc::location!())?; + Ok(num_push_subscriptions) } else { Ok(0) diff --git a/crates/migration/src/push_v2.rs b/crates/migration/src/push_v2.rs new file mode 100644 index 00000000..958e9bdc --- /dev/null +++ b/crates/migration/src/push_v2.rs @@ -0,0 +1,122 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use email::push::{Keys, PushSubscription, PushSubscriptions}; +use store::{ + Serialize, + write::{Archiver, BatchBuilder, now}, +}; +use trc::AddContext; +use types::{ + collection::Collection, + field::{Field, PrincipalField}, + type_state::DataType, +}; +use utils::map::bitmap::Bitmap; + +pub(crate) async fn migrate_push_subscriptions_v013( + server: &Server, + account_id: u32, +) -> trc::Result { + // Obtain email ids + let push_ids = server + .get_document_ids(account_id, Collection::PushSubscription) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_pushes = push_ids.len(); + if num_pushes == 0 { + return Ok(0); + } + let mut subscriptions = Vec::with_capacity(num_pushes as usize); + + for push_id in &push_ids { + match server + .get_archive(account_id, Collection::PushSubscription, push_id) + .await + { + Ok(Some(legacy)) => match legacy.deserialize_untrusted::() { + Ok(old_push) => { + subscriptions.push(PushSubscription { + id: push_id, + url: old_push.url, + device_client_id: old_push.device_client_id, + expires: old_push.expires, + verification_code: old_push.verification_code, + verified: old_push.verified, + types: old_push.types, + keys: old_push.keys, + email_push: Vec::new(), + }); + } + Err(err) => { + return Err(err.account_id(push_id).caused_by(trc::location!())); + } + }, + Ok(None) => (), + Err(err) => { + return Err(err.account_id(push_id).caused_by(trc::location!())); + } + } + } + + if !subscriptions.is_empty() { + // Save changes + let num_push_subscriptions = subscriptions.len() as u64; + let now = now(); + let mut batch = BatchBuilder::new(); + + // Delete archived and document ids + batch + .with_account_id(account_id) + .with_collection(Collection::PushSubscription) + .create_document(account_id); + for subscription in &subscriptions { + batch.delete_document(subscription.id).clear(Field::ARCHIVE); + } + + subscriptions.retain(|s| s.verified && s.expires > now); + + if !subscriptions.is_empty() { + batch + .with_account_id(u32::MAX) + .with_collection(Collection::PushSubscription) + .create_document(account_id) + .with_account_id(account_id) + .with_collection(Collection::Principal) + .update_document(0) + .set( + PrincipalField::PushSubscriptions, + Archiver::new(PushSubscriptions { subscriptions }) + .serialize() + .caused_by(trc::location!())?, + ); + } + + server + .commit_batch(batch) + .await + .caused_by(trc::location!())?; + + Ok(num_push_subscriptions) + } else { + Ok(0) + } +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Default, Debug, Clone, PartialEq, Eq, +)] +pub struct PushSubscriptionV2 { + pub url: String, + pub device_client_id: String, + pub expires: u64, + pub verification_code: String, + pub verified: bool, + pub types: Bitmap, + pub keys: Option, +} diff --git a/crates/migration/src/sieve.rs b/crates/migration/src/sieve_v1.rs similarity index 94% rename from crates/migration/src/sieve.rs rename to crates/migration/src/sieve_v1.rs index 6875ce1a..1d3b3edd 100644 --- a/crates/migration/src/sieve.rs +++ b/crates/migration/src/sieve_v1.rs @@ -17,24 +17,11 @@ use store::{ }; use trc::{AddContext, StoreEvent}; use types::{ - blob_hash::BlobHash, collection::Collection, field::{Field, PrincipalField, SieveField}, }; -#[derive( - rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, -)] -#[rkyv(derive(Debug))] -pub struct LegacySieveScript { - pub name: String, - pub is_active: bool, - pub blob_hash: BlobHash, - pub size: u32, - pub vacation_response: Option, -} - -pub(crate) async fn migrate_sieve(server: &Server, account_id: u32) -> trc::Result { +pub(crate) async fn migrate_sieve_v011(server: &Server, account_id: u32) -> trc::Result { // Obtain email ids let script_ids = server .get_document_ids(account_id, Collection::SieveScript) diff --git a/crates/migration/src/sieve_v2.rs b/crates/migration/src/sieve_v2.rs new file mode 100644 index 00000000..ab9014aa --- /dev/null +++ b/crates/migration/src/sieve_v2.rs @@ -0,0 +1,101 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use common::Server; +use email::sieve::{SieveScript, VacationResponse}; +use store::{ + Serialize, SerializeInfallible, + write::{Archiver, BatchBuilder}, +}; +use trc::AddContext; +use types::{ + blob_hash::BlobHash, + collection::Collection, + field::{Field, PrincipalField}, +}; + +pub(crate) async fn migrate_sieve_v013(server: &Server, account_id: u32) -> trc::Result { + // Obtain email ids + let script_ids = server + .get_document_ids(account_id, Collection::SieveScript) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + let num_scripts = script_ids.len(); + if num_scripts == 0 { + return Ok(0); + } + let mut num_migrated = 0; + + for script_id in &script_ids { + match server + .get_archive(account_id, Collection::SieveScript, script_id) + .await + { + Ok(Some(legacy)) => match legacy.deserialize_untrusted::() { + Ok(old_sieve) => { + let script = SieveScript { + name: old_sieve.name, + blob_hash: old_sieve.blob_hash, + size: old_sieve.size, + vacation_response: old_sieve.vacation_response, + }; + + let mut batch = BatchBuilder::new(); + batch + .with_account_id(account_id) + .with_collection(Collection::SieveScript) + .update_document(script_id) + .unindex(Field::new(0u8), vec![u8::from(old_sieve.is_active)]) + .set( + Field::ARCHIVE, + Archiver::new(script) + .serialize() + .caused_by(trc::location!())?, + ); + + if old_sieve.is_active { + batch + .with_account_id(account_id) + .with_collection(Collection::Principal) + .update_document(0) + .set(PrincipalField::ActiveScriptId, script_id.serialize()); + } + num_migrated += 1; + + server + .store() + .write(batch.build_all()) + .await + .caused_by(trc::location!())?; + } + Err(_) => { + if let Err(err) = legacy.deserialize_untrusted::() { + return Err(err.account_id(script_id).caused_by(trc::location!())); + } + } + }, + Ok(None) => (), + Err(err) => { + return Err(err.account_id(script_id).caused_by(trc::location!())); + } + } + } + + Ok(num_migrated) +} + +#[derive( + rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, +)] +#[rkyv(derive(Debug))] +pub struct SieveScriptV2 { + pub name: String, + pub is_active: bool, + pub blob_hash: BlobHash, + pub size: u32, + pub vacation_response: Option, +} diff --git a/crates/migration/src/v011.rs b/crates/migration/src/v011.rs new file mode 100644 index 00000000..6804a7c8 --- /dev/null +++ b/crates/migration/src/v011.rs @@ -0,0 +1,176 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use crate::{ + LOCK_RETRY_TIME, LOCK_WAIT_TIME_ACCOUNT, LOCK_WAIT_TIME_CORE, + changelog::reset_changelog, + principal_v1::{migrate_principal_v0_11, migrate_principals_v0_11}, + queue::migrate_queue_v011, + report::migrate_reports, +}; +use common::{KV_LOCK_HOUSEKEEPER, Server}; +use store::{ + dispatch::lookup::KeyValue, + rand::{self, seq::SliceRandom}, +}; +use trc::AddContext; +use types::collection::Collection; + +pub(crate) async fn migrate_v0_11(server: &Server) -> trc::Result<()> { + let force_lock = std::env::var("FORCE_LOCK").is_ok(); + let in_memory = server.in_memory_store(); + let principal_ids; + + loop { + if force_lock + || in_memory + .try_lock( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_lock", + LOCK_WAIT_TIME_CORE, + ) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_done", + )) + .await + .caused_by(trc::location!())? + .is_none() + { + migrate_queue_v011(server) + .await + .caused_by(trc::location!())?; + migrate_reports(server).await.caused_by(trc::location!())?; + reset_changelog(server).await.caused_by(trc::location!())?; + principal_ids = migrate_principals_v0_11(server) + .await + .caused_by(trc::location!())?; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } else { + principal_ids = server + .get_document_ids(u32::MAX, Collection::Principal) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration completed by another node.",) + ); + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") + .await + .caused_by(trc::location!())?; + break; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration lock busy, waiting 30 seconds.",) + ); + + tokio::time::sleep(LOCK_RETRY_TIME).await; + } + } + + if !principal_ids.is_empty() { + let mut principal_ids = principal_ids.into_iter().collect::>(); + principal_ids.shuffle(&mut rand::rng()); + + loop { + let mut skipped_principal_ids = Vec::new(); + let mut num_migrated = 0; + + for principal_id in principal_ids { + let lock_key = format!("migrate_{principal_id}_lock"); + let done_key = format!("migrate_{principal_id}_done"); + + if force_lock + || in_memory + .try_lock( + KV_LOCK_HOUSEKEEPER, + lock_key.as_bytes(), + LOCK_WAIT_TIME_ACCOUNT, + ) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + )) + .await + .caused_by(trc::location!())? + .is_none() + { + migrate_principal_v0_11(server, principal_id) + .await + .caused_by(trc::location!())?; + + num_migrated += 1; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + ), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes()) + .await + .caused_by(trc::location!())?; + } else { + skipped_principal_ids.push(principal_id); + } + } + + if !skipped_principal_ids.is_empty() { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!( + "Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.", + skipped_principal_ids.len() + ) + ); + tokio::time::sleep(LOCK_RETRY_TIME).await; + principal_ids = skipped_principal_ids; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Account migration completed.",) + ); + break; + } + } + } + + Ok(()) +} diff --git a/crates/migration/src/v012.rs b/crates/migration/src/v012.rs new file mode 100644 index 00000000..17e012c3 --- /dev/null +++ b/crates/migration/src/v012.rs @@ -0,0 +1,61 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use crate::{ + LOCK_RETRY_TIME, LOCK_WAIT_TIME_CORE, event_v1::migrate_calendar_events_v012, + queue::migrate_queue_v012, tasks::migrate_tasks_v011, +}; +use common::{KV_LOCK_HOUSEKEEPER, Server}; +use trc::AddContext; + +pub(crate) async fn migrate_v0_12(server: &Server, migrate_tasks: bool) -> trc::Result<()> { + let force_lock = std::env::var("FORCE_LOCK").is_ok(); + let in_memory = server.in_memory_store(); + + loop { + if force_lock + || in_memory + .try_lock( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_lock", + LOCK_WAIT_TIME_CORE, + ) + .await + .caused_by(trc::location!())? + { + migrate_queue_v012(server) + .await + .caused_by(trc::location!())?; + + if migrate_tasks { + migrate_tasks_v011(server) + .await + .caused_by(trc::location!())?; + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") + .await + .caused_by(trc::location!())?; + break; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration lock busy, waiting 30 seconds.",) + ); + + tokio::time::sleep(LOCK_RETRY_TIME).await; + } + } + + if migrate_tasks { + migrate_calendar_events_v012(server) + .await + .caused_by(trc::location!()) + } else { + Ok(()) + } +} diff --git a/crates/migration/src/v013.rs b/crates/migration/src/v013.rs new file mode 100644 index 00000000..4be9e2e3 --- /dev/null +++ b/crates/migration/src/v013.rs @@ -0,0 +1,168 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs LLC + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use crate::{ + LOCK_RETRY_TIME, LOCK_WAIT_TIME_ACCOUNT, LOCK_WAIT_TIME_CORE, + principal_v2::{migrate_principal_v0_13, migrate_principals_v0_13}, +}; +use common::{KV_LOCK_HOUSEKEEPER, Server}; +use store::{ + dispatch::lookup::KeyValue, + rand::{self, seq::SliceRandom}, +}; +use trc::AddContext; +use types::collection::Collection; + +pub(crate) async fn migrate_v0_13(server: &Server) -> trc::Result<()> { + let force_lock = std::env::var("FORCE_LOCK").is_ok(); + let in_memory = server.in_memory_store(); + let principal_ids; + + loop { + if force_lock + || in_memory + .try_lock( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_lock", + LOCK_WAIT_TIME_CORE, + ) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + b"migrate_core_done", + )) + .await + .caused_by(trc::location!())? + .is_none() + { + principal_ids = migrate_principals_v0_13(server) + .await + .caused_by(trc::location!())?; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } else { + principal_ids = server + .get_document_ids(u32::MAX, Collection::Principal) + .await + .caused_by(trc::location!())? + .unwrap_or_default(); + + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration completed by another node.",) + ); + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock") + .await + .caused_by(trc::location!())?; + break; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Migration lock busy, waiting 30 seconds.",) + ); + + tokio::time::sleep(LOCK_RETRY_TIME).await; + } + } + + if !principal_ids.is_empty() { + let mut principal_ids = principal_ids.into_iter().collect::>(); + principal_ids.shuffle(&mut rand::rng()); + + loop { + let mut skipped_principal_ids = Vec::new(); + let mut num_migrated = 0; + + for principal_id in principal_ids { + let lock_key = format!("migrate_{principal_id}_lock"); + let done_key = format!("migrate_{principal_id}_done"); + + if force_lock + || in_memory + .try_lock( + KV_LOCK_HOUSEKEEPER, + lock_key.as_bytes(), + LOCK_WAIT_TIME_ACCOUNT, + ) + .await + .caused_by(trc::location!())? + { + if in_memory + .key_get::<()>(KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + )) + .await + .caused_by(trc::location!())? + .is_none() + { + migrate_principal_v0_13(server, principal_id) + .await + .caused_by(trc::location!())?; + + num_migrated += 1; + + in_memory + .key_set( + KeyValue::new( + KeyValue::<()>::build_key( + KV_LOCK_HOUSEKEEPER, + done_key.as_bytes(), + ), + b"1".to_vec(), + ) + .expires(86400), + ) + .await + .caused_by(trc::location!())?; + } + + in_memory + .remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes()) + .await + .caused_by(trc::location!())?; + } else { + skipped_principal_ids.push(principal_id); + } + } + + if !skipped_principal_ids.is_empty() { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!( + "Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.", + skipped_principal_ids.len() + ) + ); + tokio::time::sleep(LOCK_RETRY_TIME).await; + principal_ids = skipped_principal_ids; + } else { + trc::event!( + Server(trc::ServerEvent::Startup), + Details = format!("Account migration completed.",) + ); + break; + } + } + } + + Ok(()) +} diff --git a/crates/types/src/field.rs b/crates/types/src/field.rs index 87bcc0a7..e9843b99 100644 --- a/crates/types/src/field.rs +++ b/crates/types/src/field.rs @@ -234,6 +234,10 @@ impl From for Field { impl Field { pub const ARCHIVE: Field = Field(ARCHIVE_FIELD); + + pub fn new(value: u8) -> Self { + Field(value) + } } impl FieldType for Field {} diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index dd6d478a..baf81b69 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -80,7 +80,7 @@ async fn jmap_tests() { server::webhooks::test(&mut params).await; - mail::get::test(&mut params).await; + /*mail::get::test(&mut params).await; mail::set::test(&mut params).await; mail::parse::test(&mut params).await; mail::query::test(&mut params, delete).await; @@ -105,7 +105,7 @@ async fn jmap_tests() { auth::limits::test(&mut params).await; auth::oauth::test(&mut params).await; - auth::quota::test(&mut params).await; + auth::quota::test(&mut params).await;*/ auth::permissions::test(¶ms).await; contacts::addressbook::test(&mut params).await;