v0.13 migration

This commit is contained in:
mdecimus 2025-10-21 15:38:53 +02:00
parent 4620a92fd8
commit c5e264574b
20 changed files with 1845 additions and 343 deletions

35
Cargo.lock generated
View file

@ -895,6 +895,21 @@ dependencies = [
"pkg-config", "pkg-config",
] ]
[[package]]
name = "calcard"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "720e412adf25f179f643b0753108cb308b812f82e1d34131c06b015c806e3f3c"
dependencies = [
"ahash",
"chrono",
"chrono-tz",
"hashify",
"mail-builder",
"mail-parser",
"rkyv",
]
[[package]] [[package]]
name = "calcard" name = "calcard"
version = "0.3.1" version = "0.3.1"
@ -1165,7 +1180,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bincode 2.0.1", "bincode 2.0.1",
"biscuit", "biscuit",
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"compact_str", "compact_str",
"decancer", "decancer",
@ -1700,7 +1715,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
name = "dav" name = "dav"
version = "0.14.0" version = "0.14.0"
dependencies = [ dependencies = [
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"common", "common",
"compact_str", "compact_str",
@ -1722,7 +1737,7 @@ dependencies = [
name = "dav-proto" name = "dav-proto"
version = "0.14.0" version = "0.14.0"
dependencies = [ dependencies = [
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"compact_str", "compact_str",
"hashify", "hashify",
@ -2776,7 +2791,7 @@ name = "groupware"
version = "0.14.0" version = "0.14.0"
dependencies = [ dependencies = [
"ahash", "ahash",
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"common", "common",
"compact_str", "compact_str",
@ -3790,7 +3805,7 @@ dependencies = [
"aes-gcm-siv", "aes-gcm-siv",
"async-stream", "async-stream",
"base64 0.22.1", "base64 0.22.1",
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"common", "common",
"compact_str", "compact_str",
@ -3897,7 +3912,7 @@ name = "jmap_proto"
version = "0.14.0" version = "0.14.0"
dependencies = [ dependencies = [
"ahash", "ahash",
"calcard", "calcard 0.3.1",
"compact_str", "compact_str",
"hashify", "hashify",
"jmap-tools", "jmap-tools",
@ -4459,7 +4474,8 @@ version = "0.14.0"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"bincode 1.3.3", "bincode 1.3.3",
"calcard", "calcard 0.1.3",
"calcard 0.3.1",
"common", "common",
"compact_str", "compact_str",
"dav-proto", "dav-proto",
@ -4470,6 +4486,7 @@ dependencies = [
"mail-auth", "mail-auth",
"mail-parser", "mail-parser",
"nlp", "nlp",
"proc_macros",
"rkyv", "rkyv",
"serde", "serde",
"serde_json", "serde_json",
@ -7281,7 +7298,7 @@ dependencies = [
"aes-gcm", "aes-gcm",
"aes-gcm-siv", "aes-gcm-siv",
"base64 0.22.1", "base64 0.22.1",
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"common", "common",
"compact_str", "compact_str",
@ -7948,7 +7965,7 @@ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"biscuit", "biscuit",
"bytes", "bytes",
"calcard", "calcard 0.3.1",
"chrono", "chrono",
"common", "common",
"compact_str", "compact_str",

View file

@ -87,10 +87,11 @@ Schema history:
1 - v0.12.0 1 - v0.12.0
2 - v0.12.4 2 - v0.12.4
3 - v0.13.0 3 - v0.13.0
4 - v0.14.0
*/ */
pub const DATABASE_SCHEMA_VERSION: u32 = 3; pub const DATABASE_SCHEMA_VERSION: u32 = 4;
pub const LONG_1D_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24); pub const LONG_1D_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24);
pub const LONG_1Y_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365); pub const LONG_1Y_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24 * 365);

View file

@ -16,10 +16,12 @@ directory = { path = "../directory" }
smtp = { path = "../smtp" } smtp = { path = "../smtp" }
groupware = { path = "../groupware" } groupware = { path = "../groupware" }
dav-proto = { path = "../dav-proto" } dav-proto = { path = "../dav-proto" }
proc_macros = { path = "../utils/proc-macros" }
mail-parser = { version = "0.11", features = ["full_encoding"] } mail-parser = { version = "0.11", features = ["full_encoding"] }
mail-auth = { version = "0.7.1", features = ["rkyv"] } mail-auth = { version = "0.7.1", features = ["rkyv"] }
sieve-rs = { version = "0.7", features = ["rkyv"] } sieve-rs = { version = "0.7", features = ["rkyv"] }
calcard = { version = "0.3", features = ["rkyv"] } calcard_latest = { package = "calcard", version = "0.3", features = ["rkyv"] }
calcard_v01 = { package = "calcard", version = "0.1", features = ["rkyv"] }
tokio = { version = "1.47", features = ["net", "macros"] } tokio = { version = "1.47", features = ["net", "macros"] }
serde = { version = "1.0", features = ["derive"]} serde = { version = "1.0", features = ["derive"]}
serde_json = "1.0" serde_json = "1.0"

View file

@ -0,0 +1,101 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::Server;
use groupware::contact::{AddressBook, AddressBookPreferences};
use store::{
Serialize,
write::{Archiver, BatchBuilder, serialize::rkyv_deserialize},
};
use trc::AddContext;
use types::{acl::AclGrant, collection::Collection, dead_property::DeadProperty, field::Field};
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
#[rkyv(derive(Debug))]
pub struct AddressBookV2 {
pub name: String,
pub display_name: Option<String>,
pub description: Option<String>,
pub sort_order: u32,
pub is_default: bool,
pub subscribers: Vec<u32>,
pub dead_properties: DeadProperty,
pub acls: Vec<AclGrant>,
pub created: i64,
pub modified: i64,
}
pub(crate) async fn migrate_addressbook_v013(server: &Server, account_id: u32) -> trc::Result<u64> {
let document_ids = server
.get_document_ids(account_id, Collection::AddressBook)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
if document_ids.is_empty() {
return Ok(0);
}
let mut num_migrated = 0;
for document_id in document_ids.iter() {
let Some(archive) = server
.get_archive(account_id, Collection::AddressBook, document_id)
.await
.caused_by(trc::location!())?
else {
continue;
};
match archive.unarchive_untrusted::<AddressBookV2>() {
Ok(book) => {
let book = rkyv_deserialize::<_, AddressBookV2>(book).unwrap();
let new_book = AddressBook {
name: book.name,
preferences: vec![AddressBookPreferences {
account_id,
name: book
.display_name
.unwrap_or_else(|| "Address Book".to_string()),
description: book.description,
sort_order: book.sort_order,
}],
subscribers: book.subscribers,
dead_properties: book.dead_properties,
acls: book.acls,
created: book.created,
modified: book.modified,
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::AddressBook)
.update_document(document_id)
.set(
Field::ARCHIVE,
Archiver::new(new_book)
.serialize()
.caused_by(trc::location!())?,
);
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
num_migrated += 1;
}
Err(err) => {
if let Err(err_) = archive.unarchive_untrusted::<AddressBook>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}
}
}
Ok(num_migrated)
}

View file

@ -0,0 +1,144 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::Server;
use groupware::calendar::{Calendar, CalendarPreferences, Timezone};
use store::{
Serialize,
write::{Archiver, BatchBuilder, serialize::rkyv_deserialize},
};
use trc::AddContext;
use types::{acl::AclGrant, collection::Collection, dead_property::DeadProperty, field::Field};
use crate::event_v2::migrate_icalendar_v02;
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct CalendarV2 {
pub name: String,
pub preferences: Vec<CalendarPreferencesV2>,
pub default_alerts: Vec<DefaultAlertV2>,
pub acls: Vec<AclGrant>,
pub dead_properties: DeadProperty,
pub created: i64,
pub modified: i64,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct CalendarPreferencesV2 {
pub account_id: u32,
pub name: String,
pub description: Option<String>,
pub sort_order: u32,
pub color: Option<String>,
pub flags: u16,
pub time_zone: TimezoneV2,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub enum TimezoneV2 {
IANA(u16),
Custom(calcard_v01::icalendar::ICalendar),
#[default]
Default,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct DefaultAlertV2 {
pub account_id: u32,
pub id: String,
pub alert: calcard_v01::icalendar::ICalendar,
pub with_time: bool,
}
pub(crate) async fn migrate_calendar_v013(server: &Server, account_id: u32) -> trc::Result<u64> {
let document_ids = server
.get_document_ids(account_id, Collection::Calendar)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
if document_ids.is_empty() {
return Ok(0);
}
let mut num_migrated = 0;
for document_id in document_ids.iter() {
let Some(archive) = server
.get_archive(account_id, Collection::Calendar, document_id)
.await
.caused_by(trc::location!())?
else {
continue;
};
match archive.unarchive_untrusted::<CalendarV2>() {
Ok(calendar) => {
let calendar = rkyv_deserialize::<_, CalendarV2>(calendar).unwrap();
let new_calendar = Calendar {
name: calendar.name,
preferences: calendar
.preferences
.into_iter()
.map(|pref| CalendarPreferences {
account_id: pref.account_id,
name: pref.name,
description: pref.description,
sort_order: pref.sort_order,
color: pref.color,
flags: 0,
time_zone: match pref.time_zone {
TimezoneV2::IANA(tzid) => Timezone::IANA(tzid),
TimezoneV2::Custom(tz) => {
Timezone::Custom(migrate_icalendar_v02(tz))
}
TimezoneV2::Default => Timezone::Default,
},
default_alerts: Vec::new(),
})
.collect(),
acls: calendar.acls,
supported_components: 0,
dead_properties: calendar.dead_properties,
created: calendar.created,
modified: calendar.modified,
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Calendar)
.update_document(document_id)
.set(
Field::ARCHIVE,
Archiver::new(new_calendar)
.serialize()
.caused_by(trc::location!())?,
);
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
num_migrated += 1;
}
Err(err) => {
if let Err(err_) = archive.unarchive_untrusted::<Calendar>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}
}
}
Ok(num_migrated)
}

View file

@ -0,0 +1,89 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{DavName, Server};
use groupware::contact::ContactCard;
use store::{
Serialize,
write::{Archiver, BatchBuilder, serialize::rkyv_deserialize},
};
use trc::AddContext;
use types::{collection::Collection, dead_property::DeadProperty, field::Field};
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct ContactCardV2 {
pub names: Vec<DavName>,
pub display_name: Option<String>,
pub card: calcard_v01::vcard::VCard,
pub dead_properties: DeadProperty,
pub created: i64,
pub modified: i64,
pub size: u32,
}
pub(crate) async fn migrate_contacts_v013(server: &Server, account_id: u32) -> trc::Result<u64> {
let document_ids = server
.get_document_ids(account_id, Collection::ContactCard)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let mut num_migrated = 0;
for document_id in document_ids.iter() {
let Some(archive) = server
.get_archive(account_id, Collection::ContactCard, document_id)
.await
.caused_by(trc::location!())?
else {
continue;
};
match archive.unarchive_untrusted::<ContactCardV2>() {
Ok(contact) => {
let contact = rkyv_deserialize::<_, ContactCardV2>(contact).unwrap();
let new_contact = ContactCard {
names: contact.names,
display_name: contact.display_name,
dead_properties: contact.dead_properties,
size: contact.size,
created: contact.created,
modified: contact.modified,
card: calcard_latest::vcard::VCard::parse(contact.card.to_string())
.unwrap_or_default(),
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::ContactCard)
.update_document(document_id)
.set(
Field::ARCHIVE,
Archiver::new(new_contact)
.serialize()
.caused_by(trc::location!())?,
);
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
num_migrated += 1;
}
Err(err) => {
if let Err(err_) = archive.unarchive_untrusted::<ContactCard>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}
}
}
Ok(num_migrated)
}

View file

@ -4,7 +4,6 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use calcard::{common::timezone::Tz, icalendar::ICalendar};
use common::{DavName, Server}; use common::{DavName, Server};
use groupware::calendar::{AlarmDelta, CalendarEvent, CalendarEventData, ComponentTimeRange}; use groupware::calendar::{AlarmDelta, CalendarEvent, CalendarEventData, ComponentTimeRange};
use store::{ use store::{
@ -15,6 +14,8 @@ use store::{
use trc::AddContext; use trc::AddContext;
use types::{collection::Collection, dead_property::DeadProperty, field::Field}; use types::{collection::Collection, dead_property::DeadProperty, field::Field};
use crate::event_v2::migrate_icalendar_v02;
#[derive( #[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)] )]
@ -35,14 +36,14 @@ pub struct CalendarEventV1 {
)] )]
pub struct UserProperties { pub struct UserProperties {
pub account_id: u32, pub account_id: u32,
pub properties: ICalendar, pub properties: calcard_v01::icalendar::ICalendar,
} }
#[derive( #[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq, rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)] )]
pub struct CalendarEventDataV1 { pub struct CalendarEventDataV1 {
pub event: ICalendar, pub event: calcard_v01::icalendar::ICalendar,
pub time_ranges: Box<[ComponentTimeRange]>, pub time_ranges: Box<[ComponentTimeRange]>,
pub alarms: Box<[AlarmV1]>, pub alarms: Box<[AlarmV1]>,
pub base_offset: i64, pub base_offset: i64,
@ -59,7 +60,7 @@ pub struct AlarmV1 {
pub alarms: Box<[AlarmDelta]>, pub alarms: Box<[AlarmDelta]>,
} }
pub(crate) async fn migrate_calendar_events(server: &Server) -> trc::Result<()> { pub(crate) async fn migrate_calendar_events_v012(server: &Server) -> trc::Result<()> {
// Obtain email ids // Obtain email ids
let account_ids = server let account_ids = server
.get_document_ids(u32::MAX, Collection::Principal) .get_document_ids(u32::MAX, Collection::Principal)
@ -103,8 +104,8 @@ pub(crate) async fn migrate_calendar_events(server: &Server) -> trc::Result<()>
names: event.names, names: event.names,
display_name: event.display_name, display_name: event.display_name,
data: CalendarEventData::new( data: CalendarEventData::new(
event.data.event, migrate_icalendar_v02(event.data.event),
Tz::Floating, calcard_latest::common::timezone::Tz::Floating,
server.core.groupware.max_ical_instances, server.core.groupware.max_ical_instances,
&mut next_email_alarm, &mut next_email_alarm,
), ),

View file

@ -0,0 +1,212 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::{DavName, Server};
use groupware::calendar::{
Alarm, CalendarEvent, CalendarEventData, CalendarEventNotification, ComponentTimeRange,
};
use store::{
Serialize,
write::{Archiver, BatchBuilder, serialize::rkyv_deserialize},
};
use trc::AddContext;
use types::{collection::Collection, dead_property::DeadProperty, field::Field};
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct CalendarEventV2 {
pub names: Vec<DavName>,
pub display_name: Option<String>,
pub data: CalendarEventDataV2,
pub user_properties: Vec<UserPropertiesV2>,
pub flags: u16,
pub dead_properties: DeadProperty,
pub size: u32,
pub created: i64,
pub modified: i64,
pub schedule_tag: Option<u32>,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct UserPropertiesV2 {
pub account_id: u32,
pub properties: calcard_v01::icalendar::ICalendar,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct CalendarEventDataV2 {
pub event: calcard_v01::icalendar::ICalendar,
pub time_ranges: Box<[ComponentTimeRange]>,
pub alarms: Box<[Alarm]>,
pub base_offset: i64,
pub base_time_utc: u32,
pub duration: u32,
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
pub struct CalendarEventNotificationV2 {
pub itip: calcard_v01::icalendar::ICalendar,
pub event_id: Option<u32>,
pub flags: u16,
pub size: u32,
pub created: i64,
pub modified: i64,
}
pub(crate) async fn migrate_calendar_events_v013(
server: &Server,
account_id: u32,
) -> trc::Result<u64> {
let document_ids = server
.get_document_ids(account_id, Collection::CalendarEvent)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let mut num_migrated = 0;
for document_id in document_ids.iter() {
let Some(archive) = server
.get_archive(account_id, Collection::CalendarEvent, document_id)
.await
.caused_by(trc::location!())?
else {
continue;
};
match archive.unarchive_untrusted::<CalendarEventV2>() {
Ok(event) => {
let event = rkyv_deserialize::<_, CalendarEventV2>(event).unwrap();
let new_event = CalendarEvent {
names: event.names,
display_name: event.display_name,
data: CalendarEventData {
event: migrate_icalendar_v02(event.data.event),
time_ranges: event.data.time_ranges,
alarms: event.data.alarms,
base_offset: event.data.base_offset,
base_time_utc: event.data.base_time_utc,
duration: event.data.duration,
},
preferences: Default::default(),
flags: event.flags,
dead_properties: event.dead_properties,
size: event.size,
created: event.created,
modified: event.modified,
schedule_tag: None,
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::CalendarEvent)
.update_document(document_id)
.set(
Field::ARCHIVE,
Archiver::new(new_event)
.serialize()
.caused_by(trc::location!())?,
);
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
num_migrated += 1;
}
Err(err) => {
if let Err(err_) = archive.unarchive_untrusted::<CalendarEvent>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}
}
}
Ok(num_migrated)
}
pub(crate) async fn migrate_calendar_scheduling_v013(
server: &Server,
account_id: u32,
) -> trc::Result<u64> {
let document_ids = server
.get_document_ids(account_id, Collection::CalendarEventNotification)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let mut num_migrated = 0;
for document_id in document_ids.iter() {
let Some(archive) = server
.get_archive(
account_id,
Collection::CalendarEventNotification,
document_id,
)
.await
.caused_by(trc::location!())?
else {
continue;
};
match archive.unarchive_untrusted::<CalendarEventNotificationV2>() {
Ok(event) => {
let event = rkyv_deserialize::<_, CalendarEventNotificationV2>(event).unwrap();
let new_event = CalendarEventNotification {
event: migrate_icalendar_v02(event.itip),
event_id: event.event_id,
changed_by: Default::default(),
flags: 0,
size: event.size,
created: event.created,
modified: event.modified,
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::CalendarEventNotification)
.update_document(document_id)
.set(
Field::ARCHIVE,
Archiver::new(new_event)
.serialize()
.caused_by(trc::location!())?,
);
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
num_migrated += 1;
}
Err(err) => {
if let Err(err_) = archive.unarchive_untrusted::<CalendarEventNotification>() {
trc::error!(err_.caused_by(trc::location!()));
return Err(err.caused_by(trc::location!()));
}
}
}
}
Ok(num_migrated)
}
pub(crate) fn migrate_icalendar_v02(
ical: calcard_v01::icalendar::ICalendar,
) -> calcard_latest::icalendar::ICalendar {
calcard_latest::icalendar::ICalendar::parse(ical.to_string()).unwrap_or_default()
}

View file

@ -5,71 +5,58 @@
*/ */
use crate::{ use crate::{
calendar::migrate_calendar_events,
queue::{migrate_queue_v011, migrate_queue_v012}, queue::{migrate_queue_v011, migrate_queue_v012},
tasks::migrate_tasks_v011, v011::migrate_v0_11,
v012::migrate_v0_12,
v013::migrate_v0_13,
}; };
use changelog::reset_changelog; use common::{DATABASE_SCHEMA_VERSION, Server, manager::boot::DEFAULT_SETTINGS};
use common::{
DATABASE_SCHEMA_VERSION, KV_LOCK_HOUSEKEEPER, Server, manager::boot::DEFAULT_SETTINGS,
};
use principal::{migrate_principal, migrate_principals};
use report::migrate_reports;
use std::time::Duration; use std::time::Duration;
use store::{ use store::{
Deserialize, IterateParams, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_REPORT_IN, Deserialize, IterateParams, SUBSPACE_PROPERTY, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_REPORT_IN,
SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SerializeInfallible, U32_LEN, Value, ValueKey, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SerializeInfallible, U32_LEN, Value, ValueKey,
dispatch::{DocumentSet, lookup::KeyValue}, dispatch::DocumentSet,
rand::{self, seq::SliceRandom},
write::{AnyClass, AnyKey, BatchBuilder, ValueClass, key::DeserializeBigEndian}, write::{AnyClass, AnyKey, BatchBuilder, ValueClass, key::DeserializeBigEndian},
}; };
use trc::AddContext; use trc::AddContext;
use types::collection::Collection; use types::collection::Collection;
pub mod calendar; pub mod addressbook_v2;
pub mod calendar_v2;
pub mod changelog; pub mod changelog;
pub mod contact_v2;
pub mod email; pub mod email;
pub mod encryption; pub mod encryption;
pub mod event_v1;
pub mod event_v2;
pub mod identity; pub mod identity;
pub mod mailbox; pub mod mailbox;
pub mod object; pub mod object;
pub mod principal; pub mod principal_v1;
pub mod push; pub mod principal_v2;
pub mod push_v1;
pub mod push_v2;
pub mod queue; pub mod queue;
pub mod report; pub mod report;
pub mod sieve; pub mod sieve_v1;
pub mod sieve_v2;
pub mod submission; pub mod submission;
pub mod tasks; pub mod tasks;
pub mod threads; pub mod threads;
pub mod v011;
pub mod v012;
pub mod v013;
const LOCK_WAIT_TIME_ACCOUNT: u64 = 3 * 60; const LOCK_WAIT_TIME_ACCOUNT: u64 = 3 * 60;
const LOCK_WAIT_TIME_CORE: u64 = 5 * 60; const LOCK_WAIT_TIME_CORE: u64 = 5 * 60;
const LOCK_RETRY_TIME: Duration = Duration::from_secs(30); const LOCK_RETRY_TIME: Duration = Duration::from_secs(30);
/*
pub struct AddressBook {
pub name: String,
pub display_name: Option<String>,
pub description: Option<String>,
pub sort_order: u32,
pub is_default: bool,
pub subscribers: Vec<u32>,
pub dead_properties: DeadProperty,
pub acls: Vec<AclGrant>,
pub created: i64,
pub modified: i64,
}
*/
pub async fn try_migrate(server: &Server) -> trc::Result<()> { pub async fn try_migrate(server: &Server) -> trc::Result<()> {
if let Some(version) = std::env::var("FORCE_MIGRATE_QUEUE") if let Some(version) = std::env::var("FORCE_MIGRATE_QUEUE")
.ok() .ok()
.and_then(|s| s.parse::<u32>().ok()) .and_then(|s| s.parse::<u32>().ok())
{ {
if version == 12 { if version == 12 || version <= 2 {
migrate_queue_v012(server) migrate_queue_v012(server)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
@ -79,15 +66,8 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> {
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
} }
return Ok(()); return Ok(());
} else if let Some(account_id) = std::env::var("FORCE_MIGRATE_ACCOUNT") }
.ok() if let Some(version) = std::env::var("FORCE_MIGRATE")
.and_then(|s| s.parse().ok())
{
migrate_principal(server, account_id)
.await
.caused_by(trc::location!())?;
return Ok(());
} else if let Some(version) = std::env::var("FORCE_MIGRATE")
.ok() .ok()
.and_then(|s| s.parse::<u32>().ok()) .and_then(|s| s.parse::<u32>().ok())
{ {
@ -96,11 +76,16 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> {
migrate_v0_12(server, true) migrate_v0_12(server, true)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
migrate_v0_13(server).await.caused_by(trc::location!())?;
} }
2 => { 2 => {
migrate_v0_12(server, false) migrate_v0_12(server, false)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
migrate_v0_13(server).await.caused_by(trc::location!())?;
}
3 => {
migrate_v0_13(server).await.caused_by(trc::location!())?;
} }
_ => { _ => {
panic!("Unknown migration version: {version}"); panic!("Unknown migration version: {version}");
@ -125,14 +110,20 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> {
migrate_v0_12(server, true) migrate_v0_12(server, true)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
migrate_v0_13(server).await.caused_by(trc::location!())?;
true true
} }
Some(2) => { Some(2) => {
migrate_v0_12(server, false) migrate_v0_12(server, false)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
migrate_v0_13(server).await.caused_by(trc::location!())?;
true true
} }
Some(3) => {
migrate_v0_13(server).await.caused_by(trc::location!())?;
false
}
Some(version) => { Some(version) => {
panic!( panic!(
"Unknown database schema version, expected {} or below, found {}", "Unknown database schema version, expected {} or below, found {}",
@ -184,211 +175,6 @@ pub async fn try_migrate(server: &Server) -> trc::Result<()> {
Ok(()) Ok(())
} }
async fn migrate_v0_12(server: &Server, migrate_tasks: bool) -> trc::Result<()> {
let force_lock = std::env::var("FORCE_LOCK").is_ok();
let in_memory = server.in_memory_store();
loop {
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_lock",
LOCK_WAIT_TIME_CORE,
)
.await
.caused_by(trc::location!())?
{
migrate_queue_v012(server)
.await
.caused_by(trc::location!())?;
if migrate_tasks {
migrate_tasks_v011(server)
.await
.caused_by(trc::location!())?;
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock")
.await
.caused_by(trc::location!())?;
break;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration lock busy, waiting 30 seconds.",)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
}
}
if migrate_tasks {
migrate_calendar_events(server)
.await
.caused_by(trc::location!())
} else {
Ok(())
}
}
async fn migrate_v0_11(server: &Server) -> trc::Result<()> {
let force_lock = std::env::var("FORCE_LOCK").is_ok();
let in_memory = server.in_memory_store();
let principal_ids;
loop {
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_lock",
LOCK_WAIT_TIME_CORE,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_done",
))
.await
.caused_by(trc::location!())?
.is_none()
{
migrate_queue_v011(server)
.await
.caused_by(trc::location!())?;
migrate_reports(server).await.caused_by(trc::location!())?;
reset_changelog(server).await.caused_by(trc::location!())?;
principal_ids = migrate_principals(server)
.await
.caused_by(trc::location!())?;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
} else {
principal_ids = server
.get_document_ids(u32::MAX, Collection::Principal)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration completed by another node.",)
);
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock")
.await
.caused_by(trc::location!())?;
break;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration lock busy, waiting 30 seconds.",)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
}
}
if !principal_ids.is_empty() {
let mut principal_ids = principal_ids.into_iter().collect::<Vec<_>>();
principal_ids.shuffle(&mut rand::rng());
loop {
let mut skipped_principal_ids = Vec::new();
let mut num_migrated = 0;
for principal_id in principal_ids {
let lock_key = format!("migrate_{principal_id}_lock");
let done_key = format!("migrate_{principal_id}_done");
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
lock_key.as_bytes(),
LOCK_WAIT_TIME_ACCOUNT,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
))
.await
.caused_by(trc::location!())?
.is_none()
{
migrate_principal(server, principal_id)
.await
.caused_by(trc::location!())?;
num_migrated += 1;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes())
.await
.caused_by(trc::location!())?;
} else {
skipped_principal_ids.push(principal_id);
}
}
if !skipped_principal_ids.is_empty() {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!(
"Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.",
skipped_principal_ids.len()
)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
principal_ids = skipped_principal_ids;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Account migration completed.",)
);
break;
}
}
}
Ok(())
}
async fn is_new_install(server: &Server) -> trc::Result<bool> { async fn is_new_install(server: &Server) -> trc::Result<bool> {
for subspace in [ for subspace in [
SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUEUE_MESSAGE,

View file

@ -4,6 +4,11 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/ */
use crate::{
email::migrate_emails, encryption::migrate_encryption_params, identity::migrate_identities,
mailbox::migrate_mailboxes, push_v1::migrate_push_subscriptions_v011,
sieve_v1::migrate_sieve_v011, submission::migrate_email_submissions, threads::migrate_threads,
};
use common::Server; use common::Server;
use directory::{ use directory::{
Permission, Principal, PrincipalData, ROLE_ADMIN, ROLE_USER, Type, Permission, Principal, PrincipalData, ROLE_ADMIN, ROLE_USER, Type,
@ -22,13 +27,7 @@ use trc::AddContext;
use types::collection::Collection; use types::collection::Collection;
use utils::codec::leb128::Leb128Iterator; use utils::codec::leb128::Leb128Iterator;
use crate::{ pub(crate) async fn migrate_principals_v0_11(server: &Server) -> trc::Result<RoaringBitmap> {
email::migrate_emails, encryption::migrate_encryption_params, identity::migrate_identities,
mailbox::migrate_mailboxes, push::migrate_push_subscriptions, sieve::migrate_sieve,
submission::migrate_email_submissions, threads::migrate_threads,
};
pub(crate) async fn migrate_principals(server: &Server) -> trc::Result<RoaringBitmap> {
// Obtain email ids // Obtain email ids
let principal_ids = server let principal_ids = server
.get_document_ids(u32::MAX, Collection::Principal) .get_document_ids(u32::MAX, Collection::Principal)
@ -53,7 +52,8 @@ pub(crate) async fn migrate_principals(server: &Server) -> trc::Result<RoaringBi
.await .await
{ {
Ok(Some(legacy)) => { Ok(Some(legacy)) => {
let principal = Principal::from_legacy(legacy); let mut principal = Principal::from_legacy(legacy);
principal.sort();
let mut batch = BatchBuilder::new(); let mut batch = BatchBuilder::new();
batch batch
.with_account_id(u32::MAX) .with_account_id(u32::MAX)
@ -120,7 +120,7 @@ pub(crate) async fn migrate_principals(server: &Server) -> trc::Result<RoaringBi
Ok(principal_ids) Ok(principal_ids)
} }
pub(crate) async fn migrate_principal(server: &Server, account_id: u32) -> trc::Result<()> { pub(crate) async fn migrate_principal_v0_11(server: &Server, account_id: u32) -> trc::Result<()> {
let start_time = Instant::now(); let start_time = Instant::now();
let num_emails = migrate_emails(server, account_id) let num_emails = migrate_emails(server, account_id)
.await .await
@ -131,10 +131,10 @@ pub(crate) async fn migrate_principal(server: &Server, account_id: u32) -> trc::
let num_params = migrate_encryption_params(server, account_id) let num_params = migrate_encryption_params(server, account_id)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
let num_subscriptions = migrate_push_subscriptions(server, account_id) let num_subscriptions = migrate_push_subscriptions_v011(server, account_id)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
let num_sieve = migrate_sieve(server, account_id) let num_sieve = migrate_sieve_v011(server, account_id)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
let num_submissions = migrate_email_submissions(server, account_id) let num_submissions = migrate_email_submissions(server, account_id)

View file

@ -0,0 +1,539 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::time::Instant;
use common::Server;
use directory::{Principal, PrincipalData, Type};
use proc_macros::EnumMethods;
use store::{
Serialize, ValueKey,
roaring::RoaringBitmap,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, DirectoryClass, ValueClass},
};
use trc::AddContext;
use types::collection::Collection;
use crate::{
addressbook_v2::migrate_addressbook_v013,
calendar_v2::migrate_calendar_v013,
contact_v2::migrate_contacts_v013,
event_v2::{migrate_calendar_events_v013, migrate_calendar_scheduling_v013},
push_v2::migrate_push_subscriptions_v013,
sieve_v2::migrate_sieve_v013,
};
pub(crate) async fn migrate_principals_v0_13(server: &Server) -> trc::Result<RoaringBitmap> {
// Obtain email ids
let principal_ids = server
.get_document_ids(u32::MAX, Collection::Principal)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let num_principals = principal_ids.len();
if num_principals == 0 {
return Ok(principal_ids);
}
let mut num_migrated = 0;
for principal_id in principal_ids.iter() {
match server
.store()
.get_value::<Archive<AlignedBytes>>(ValueKey {
account_id: u32::MAX,
collection: Collection::Principal.into(),
document_id: principal_id,
class: ValueClass::Directory(DirectoryClass::Principal(principal_id)),
})
.await
{
Ok(Some(legacy)) => match legacy.deserialize_untrusted::<PrincipalV2>() {
Ok(old_principal) => {
let mut principal = Principal {
id: principal_id,
typ: old_principal.typ,
name: old_principal.name,
data: Vec::new(),
};
for secret in old_principal.secrets {
principal.data.push(PrincipalData::Secret(secret));
}
for (idx, email) in old_principal.emails.into_iter().enumerate() {
if idx == 0 {
principal.data.push(PrincipalData::PrimaryEmail(email));
} else {
principal.data.push(PrincipalData::EmailAlias(email));
}
}
if let Some(description) = old_principal.description {
principal.data.push(PrincipalData::Description(description));
}
if let Some(quota) = old_principal.quota
&& quota > 0
{
principal.data.push(PrincipalData::DiskQuota(quota));
}
if let Some(tenant) = old_principal.tenant {
principal.data.push(PrincipalData::Tenant(tenant));
}
for item in old_principal.data {
match item {
PrincipalDataV2::MemberOf(items) => {
for item in items {
principal.data.push(PrincipalData::MemberOf(item));
}
}
PrincipalDataV2::Roles(items) => {
for item in items {
principal.data.push(PrincipalData::Role(item));
}
}
PrincipalDataV2::Lists(items) => {
for item in items {
principal.data.push(PrincipalData::List(item));
}
}
PrincipalDataV2::Permissions(items) => {
for item in items {
principal.data.push(PrincipalData::Permission {
permission_id: item.permission.id(),
grant: item.grant,
});
}
}
PrincipalDataV2::Picture(item) => {
principal.data.push(PrincipalData::Picture(item));
}
PrincipalDataV2::ExternalMembers(items) => {
for item in items {
principal.data.push(PrincipalData::ExternalMember(item));
}
}
PrincipalDataV2::Urls(items) => {
for item in items {
principal.data.push(PrincipalData::Url(item));
}
}
PrincipalDataV2::PrincipalQuota(items) => {
for item in items {
principal.data.push(PrincipalData::DirectoryQuota {
quota: item.quota as u32,
typ: item.typ,
});
}
}
PrincipalDataV2::Locale(item) => {
principal.data.push(PrincipalData::Locale(item));
}
}
}
principal.sort();
let mut batch = BatchBuilder::new();
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.update_document(principal_id);
batch.set(
ValueClass::Directory(DirectoryClass::Principal(principal_id)),
Archiver::new(principal)
.serialize()
.caused_by(trc::location!())?,
);
num_migrated += 1;
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
}
Err(_) => {
if let Err(err) = legacy.deserialize_untrusted::<Principal>() {
return Err(err.account_id(principal_id).caused_by(trc::location!()));
}
}
},
Ok(None) => (),
Err(err) => {
return Err(err.account_id(principal_id).caused_by(trc::location!()));
}
}
}
if num_migrated > 0 {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migrated {num_migrated} principals",)
);
}
Ok(principal_ids)
}
pub(crate) async fn migrate_principal_v0_13(server: &Server, account_id: u32) -> trc::Result<()> {
let start_time = Instant::now();
let num_push = migrate_push_subscriptions_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_sieve = migrate_sieve_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_calendars = migrate_calendar_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_events = migrate_calendar_events_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_event_scheduling = migrate_calendar_scheduling_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_books = migrate_addressbook_v013(server, account_id)
.await
.caused_by(trc::location!())?;
let num_contacts = migrate_contacts_v013(server, account_id)
.await
.caused_by(trc::location!())?;
if num_sieve > 0
|| num_books > 0
|| num_contacts > 0
|| num_calendars > 0
|| num_events > 0
|| num_push > 0
|| num_event_scheduling > 0
{
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!(
"Migrated accountId {account_id} with {num_sieve} sieve scripts, {num_push} push subscriptions, {num_calendars} calendars, {num_events} calendar events, {num_event_scheduling} event scheduling, {num_books} address books and {num_contacts} contacts"
),
Elapsed = start_time.elapsed()
);
}
Ok(())
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)]
pub struct PrincipalV2 {
pub id: u32,
pub typ: Type,
pub name: String,
pub description: Option<String>,
pub secrets: Vec<String>,
pub emails: Vec<String>,
pub quota: Option<u64>,
pub tenant: Option<u32>,
pub data: Vec<PrincipalDataV2>,
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)]
pub enum PrincipalDataV2 {
MemberOf(Vec<u32>),
Roles(Vec<u32>),
Lists(Vec<u32>),
Permissions(Vec<PermissionGrantV2>),
Picture(String),
ExternalMembers(Vec<String>),
Urls(Vec<String>),
PrincipalQuota(Vec<PrincipalQuotaV2>),
Locale(String),
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)]
pub struct PrincipalQuotaV2 {
pub quota: u64,
pub typ: Type,
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)]
pub struct PermissionGrantV2 {
pub permission: PermissionV2,
pub grant: bool,
}
#[derive(
rkyv::Archive,
rkyv::Deserialize,
rkyv::Serialize,
Debug,
Clone,
Copy,
PartialEq,
Eq,
Hash,
serde::Serialize,
serde::Deserialize,
EnumMethods,
)]
#[serde(rename_all = "kebab-case")]
pub enum PermissionV2 {
// WARNING: add new ids at the end (TODO: use static ids)
// Admin
Impersonate,
UnlimitedRequests,
UnlimitedUploads,
DeleteSystemFolders,
MessageQueueList,
MessageQueueGet,
MessageQueueUpdate,
MessageQueueDelete,
OutgoingReportList,
OutgoingReportGet,
OutgoingReportDelete,
IncomingReportList,
IncomingReportGet,
IncomingReportDelete,
SettingsList,
SettingsUpdate,
SettingsDelete,
SettingsReload,
IndividualList,
IndividualGet,
IndividualUpdate,
IndividualDelete,
IndividualCreate,
GroupList,
GroupGet,
GroupUpdate,
GroupDelete,
GroupCreate,
DomainList,
DomainGet,
DomainCreate,
DomainUpdate,
DomainDelete,
TenantList,
TenantGet,
TenantCreate,
TenantUpdate,
TenantDelete,
MailingListList,
MailingListGet,
MailingListCreate,
MailingListUpdate,
MailingListDelete,
RoleList,
RoleGet,
RoleCreate,
RoleUpdate,
RoleDelete,
PrincipalList,
PrincipalGet,
PrincipalCreate,
PrincipalUpdate,
PrincipalDelete,
BlobFetch,
PurgeBlobStore,
PurgeDataStore,
PurgeInMemoryStore,
PurgeAccount,
FtsReindex,
Undelete,
DkimSignatureCreate,
DkimSignatureGet,
SpamFilterUpdate,
WebadminUpdate,
LogsView,
SpamFilterTrain,
Restart,
TracingList,
TracingGet,
TracingLive,
MetricsList,
MetricsLive,
// Generic
Authenticate,
AuthenticateOauth,
EmailSend,
EmailReceive,
// Account Management
ManageEncryption,
ManagePasswords,
// JMAP
JmapEmailGet,
JmapMailboxGet,
JmapThreadGet,
JmapIdentityGet,
JmapEmailSubmissionGet,
JmapPushSubscriptionGet,
JmapSieveScriptGet,
JmapVacationResponseGet,
JmapPrincipalGet,
JmapQuotaGet,
JmapBlobGet,
JmapEmailSet,
JmapMailboxSet,
JmapIdentitySet,
JmapEmailSubmissionSet,
JmapPushSubscriptionSet,
JmapSieveScriptSet,
JmapVacationResponseSet,
JmapEmailChanges,
JmapMailboxChanges,
JmapThreadChanges,
JmapIdentityChanges,
JmapEmailSubmissionChanges,
JmapQuotaChanges,
JmapEmailCopy,
JmapBlobCopy,
JmapEmailImport,
JmapEmailParse,
JmapEmailQueryChanges,
JmapMailboxQueryChanges,
JmapEmailSubmissionQueryChanges,
JmapSieveScriptQueryChanges,
JmapPrincipalQueryChanges,
JmapQuotaQueryChanges,
JmapEmailQuery,
JmapMailboxQuery,
JmapEmailSubmissionQuery,
JmapSieveScriptQuery,
JmapPrincipalQuery,
JmapQuotaQuery,
JmapSearchSnippet,
JmapSieveScriptValidate,
JmapBlobLookup,
JmapBlobUpload,
JmapEcho,
// IMAP
ImapAuthenticate,
ImapAclGet,
ImapAclSet,
ImapMyRights,
ImapListRights,
ImapAppend,
ImapCapability,
ImapId,
ImapCopy,
ImapMove,
ImapCreate,
ImapDelete,
ImapEnable,
ImapExpunge,
ImapFetch,
ImapIdle,
ImapList,
ImapLsub,
ImapNamespace,
ImapRename,
ImapSearch,
ImapSort,
ImapSelect,
ImapExamine,
ImapStatus,
ImapStore,
ImapSubscribe,
ImapThread,
// POP3
Pop3Authenticate,
Pop3List,
Pop3Uidl,
Pop3Stat,
Pop3Retr,
Pop3Dele,
// ManageSieve
SieveAuthenticate,
SieveListScripts,
SieveSetActive,
SieveGetScript,
SievePutScript,
SieveDeleteScript,
SieveRenameScript,
SieveCheckScript,
SieveHaveSpace,
// API keys
ApiKeyList,
ApiKeyGet,
ApiKeyCreate,
ApiKeyUpdate,
ApiKeyDelete,
// OAuth clients
OauthClientList,
OauthClientGet,
OauthClientCreate,
OauthClientUpdate,
OauthClientDelete,
// OAuth client registration
OauthClientRegistration,
OauthClientOverride,
AiModelInteract,
Troubleshoot,
SpamFilterClassify,
// WebDAV permissions
DavSyncCollection,
DavExpandProperty,
DavPrincipalAcl,
DavPrincipalList,
DavPrincipalMatch,
DavPrincipalSearch,
DavPrincipalSearchPropSet,
DavFilePropFind,
DavFilePropPatch,
DavFileGet,
DavFileMkCol,
DavFileDelete,
DavFilePut,
DavFileCopy,
DavFileMove,
DavFileLock,
DavFileAcl,
DavCardPropFind,
DavCardPropPatch,
DavCardGet,
DavCardMkCol,
DavCardDelete,
DavCardPut,
DavCardCopy,
DavCardMove,
DavCardLock,
DavCardAcl,
DavCardQuery,
DavCardMultiGet,
DavCalPropFind,
DavCalPropPatch,
DavCalGet,
DavCalMkCol,
DavCalDelete,
DavCalPut,
DavCalCopy,
DavCalMove,
DavCalLock,
DavCalAcl,
DavCalQuery,
DavCalMultiGet,
DavCalFreeBusyQuery,
CalendarAlarms,
CalendarSchedulingSend,
CalendarSchedulingReceive,
// WARNING: add new ids at the end (TODO: use static ids)
}

View file

@ -8,20 +8,23 @@ use super::object::Object;
use crate::object::{FromLegacy, Property, Value}; use crate::object::{FromLegacy, Property, Value};
use base64::{Engine, engine::general_purpose}; use base64::{Engine, engine::general_purpose};
use common::Server; use common::Server;
use email::push::{Keys, PushSubscription}; use email::push::{Keys, PushSubscription, PushSubscriptions};
use store::{ use store::{
Serialize, ValueKey, Serialize, ValueKey,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, ValueClass}, write::{Archiver, BatchBuilder, ValueClass},
}; };
use trc::AddContext; use trc::AddContext;
use types::{collection::Collection, field::Field, type_state::DataType}; use types::{
collection::Collection,
field::{Field, PrincipalField},
type_state::DataType,
};
pub(crate) async fn migrate_push_subscriptions( pub(crate) async fn migrate_push_subscriptions_v011(
server: &Server, server: &Server,
account_id: u32, account_id: u32,
) -> trc::Result<u64> { ) -> trc::Result<u64> {
// Obtain email ids // Obtain email ids
let todo = "fix";
let push_subscription_ids = server let push_subscription_ids = server
.get_document_ids(account_id, Collection::PushSubscription) .get_document_ids(account_id, Collection::PushSubscription)
.await .await
@ -31,7 +34,7 @@ pub(crate) async fn migrate_push_subscriptions(
if num_push_subscriptions == 0 { if num_push_subscriptions == 0 {
return Ok(0); return Ok(0);
} }
let mut did_migrate = false; let mut subscriptions = Vec::with_capacity(num_push_subscriptions as usize);
for push_subscription_id in &push_subscription_ids { for push_subscription_id in &push_subscription_ids {
match server match server
@ -45,62 +48,50 @@ pub(crate) async fn migrate_push_subscriptions(
.await .await
{ {
Ok(Some(legacy)) => { Ok(Some(legacy)) => {
let mut batch = BatchBuilder::new(); let mut subscription = PushSubscription::from_legacy(legacy);
batch subscription.id = push_subscription_id;
.with_account_id(account_id) subscriptions.push(subscription);
.with_collection(Collection::PushSubscription)
.update_document(push_subscription_id)
.set(
Field::ARCHIVE,
Archiver::new(PushSubscription::from_legacy(legacy))
.serialize()
.caused_by(trc::location!())?,
);
did_migrate = true;
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
} }
Ok(None) => (), Ok(None) => (),
Err(err) => { Err(err) => {
if server return Err(err
.store() .account_id(account_id)
.get_value::<Archive<AlignedBytes>>(ValueKey { .document_id(push_subscription_id)
account_id, .caused_by(trc::location!()));
collection: Collection::PushSubscription.into(),
document_id: push_subscription_id,
class: ValueClass::Property(Field::ARCHIVE.into()),
})
.await
.is_err()
{
return Err(err
.account_id(account_id)
.document_id(push_subscription_id)
.caused_by(trc::location!()));
}
} }
} }
} }
// Increment document id counter if !subscriptions.is_empty() {
if did_migrate { // Save changes
let num_push_subscriptions = subscriptions.len() as u64;
let mut batch = BatchBuilder::new();
batch
.with_account_id(u32::MAX)
.with_collection(Collection::PushSubscription)
.create_document(account_id)
.with_account_id(account_id);
for subscription in &subscriptions {
batch.delete_document(subscription.id).clear(Field::ARCHIVE);
}
batch
.with_collection(Collection::Principal)
.update_document(0)
.set(
PrincipalField::PushSubscriptions,
Archiver::new(PushSubscriptions { subscriptions })
.serialize()
.caused_by(trc::location!())?,
);
server server
.store() .commit_batch(batch)
.assign_document_ids(
account_id,
Collection::PushSubscription,
push_subscription_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_push_subscriptions)
+ 1,
)
.await .await
.caused_by(trc::location!())?; .caused_by(trc::location!())?;
Ok(num_push_subscriptions) Ok(num_push_subscriptions)
} else { } else {
Ok(0) Ok(0)

View file

@ -0,0 +1,122 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::Server;
use email::push::{Keys, PushSubscription, PushSubscriptions};
use store::{
Serialize,
write::{Archiver, BatchBuilder, now},
};
use trc::AddContext;
use types::{
collection::Collection,
field::{Field, PrincipalField},
type_state::DataType,
};
use utils::map::bitmap::Bitmap;
pub(crate) async fn migrate_push_subscriptions_v013(
server: &Server,
account_id: u32,
) -> trc::Result<u64> {
// Obtain email ids
let push_ids = server
.get_document_ids(account_id, Collection::PushSubscription)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let num_pushes = push_ids.len();
if num_pushes == 0 {
return Ok(0);
}
let mut subscriptions = Vec::with_capacity(num_pushes as usize);
for push_id in &push_ids {
match server
.get_archive(account_id, Collection::PushSubscription, push_id)
.await
{
Ok(Some(legacy)) => match legacy.deserialize_untrusted::<PushSubscriptionV2>() {
Ok(old_push) => {
subscriptions.push(PushSubscription {
id: push_id,
url: old_push.url,
device_client_id: old_push.device_client_id,
expires: old_push.expires,
verification_code: old_push.verification_code,
verified: old_push.verified,
types: old_push.types,
keys: old_push.keys,
email_push: Vec::new(),
});
}
Err(err) => {
return Err(err.account_id(push_id).caused_by(trc::location!()));
}
},
Ok(None) => (),
Err(err) => {
return Err(err.account_id(push_id).caused_by(trc::location!()));
}
}
}
if !subscriptions.is_empty() {
// Save changes
let num_push_subscriptions = subscriptions.len() as u64;
let now = now();
let mut batch = BatchBuilder::new();
// Delete archived and document ids
batch
.with_account_id(account_id)
.with_collection(Collection::PushSubscription)
.create_document(account_id);
for subscription in &subscriptions {
batch.delete_document(subscription.id).clear(Field::ARCHIVE);
}
subscriptions.retain(|s| s.verified && s.expires > now);
if !subscriptions.is_empty() {
batch
.with_account_id(u32::MAX)
.with_collection(Collection::PushSubscription)
.create_document(account_id)
.with_account_id(account_id)
.with_collection(Collection::Principal)
.update_document(0)
.set(
PrincipalField::PushSubscriptions,
Archiver::new(PushSubscriptions { subscriptions })
.serialize()
.caused_by(trc::location!())?,
);
}
server
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
Ok(num_push_subscriptions)
} else {
Ok(0)
}
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Default, Debug, Clone, PartialEq, Eq,
)]
pub struct PushSubscriptionV2 {
pub url: String,
pub device_client_id: String,
pub expires: u64,
pub verification_code: String,
pub verified: bool,
pub types: Bitmap<DataType>,
pub keys: Option<Keys>,
}

View file

@ -17,24 +17,11 @@ use store::{
}; };
use trc::{AddContext, StoreEvent}; use trc::{AddContext, StoreEvent};
use types::{ use types::{
blob_hash::BlobHash,
collection::Collection, collection::Collection,
field::{Field, PrincipalField, SieveField}, field::{Field, PrincipalField, SieveField},
}; };
#[derive( pub(crate) async fn migrate_sieve_v011(server: &Server, account_id: u32) -> trc::Result<u64> {
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
#[rkyv(derive(Debug))]
pub struct LegacySieveScript {
pub name: String,
pub is_active: bool,
pub blob_hash: BlobHash,
pub size: u32,
pub vacation_response: Option<VacationResponse>,
}
pub(crate) async fn migrate_sieve(server: &Server, account_id: u32) -> trc::Result<u64> {
// Obtain email ids // Obtain email ids
let script_ids = server let script_ids = server
.get_document_ids(account_id, Collection::SieveScript) .get_document_ids(account_id, Collection::SieveScript)

View file

@ -0,0 +1,101 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use common::Server;
use email::sieve::{SieveScript, VacationResponse};
use store::{
Serialize, SerializeInfallible,
write::{Archiver, BatchBuilder},
};
use trc::AddContext;
use types::{
blob_hash::BlobHash,
collection::Collection,
field::{Field, PrincipalField},
};
pub(crate) async fn migrate_sieve_v013(server: &Server, account_id: u32) -> trc::Result<u64> {
// Obtain email ids
let script_ids = server
.get_document_ids(account_id, Collection::SieveScript)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
let num_scripts = script_ids.len();
if num_scripts == 0 {
return Ok(0);
}
let mut num_migrated = 0;
for script_id in &script_ids {
match server
.get_archive(account_id, Collection::SieveScript, script_id)
.await
{
Ok(Some(legacy)) => match legacy.deserialize_untrusted::<SieveScriptV2>() {
Ok(old_sieve) => {
let script = SieveScript {
name: old_sieve.name,
blob_hash: old_sieve.blob_hash,
size: old_sieve.size,
vacation_response: old_sieve.vacation_response,
};
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
.update_document(script_id)
.unindex(Field::new(0u8), vec![u8::from(old_sieve.is_active)])
.set(
Field::ARCHIVE,
Archiver::new(script)
.serialize()
.caused_by(trc::location!())?,
);
if old_sieve.is_active {
batch
.with_account_id(account_id)
.with_collection(Collection::Principal)
.update_document(0)
.set(PrincipalField::ActiveScriptId, script_id.serialize());
}
num_migrated += 1;
server
.store()
.write(batch.build_all())
.await
.caused_by(trc::location!())?;
}
Err(_) => {
if let Err(err) = legacy.deserialize_untrusted::<SieveScript>() {
return Err(err.account_id(script_id).caused_by(trc::location!()));
}
}
},
Ok(None) => (),
Err(err) => {
return Err(err.account_id(script_id).caused_by(trc::location!()));
}
}
}
Ok(num_migrated)
}
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
)]
#[rkyv(derive(Debug))]
pub struct SieveScriptV2 {
pub name: String,
pub is_active: bool,
pub blob_hash: BlobHash,
pub size: u32,
pub vacation_response: Option<VacationResponse>,
}

View file

@ -0,0 +1,176 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
LOCK_RETRY_TIME, LOCK_WAIT_TIME_ACCOUNT, LOCK_WAIT_TIME_CORE,
changelog::reset_changelog,
principal_v1::{migrate_principal_v0_11, migrate_principals_v0_11},
queue::migrate_queue_v011,
report::migrate_reports,
};
use common::{KV_LOCK_HOUSEKEEPER, Server};
use store::{
dispatch::lookup::KeyValue,
rand::{self, seq::SliceRandom},
};
use trc::AddContext;
use types::collection::Collection;
pub(crate) async fn migrate_v0_11(server: &Server) -> trc::Result<()> {
let force_lock = std::env::var("FORCE_LOCK").is_ok();
let in_memory = server.in_memory_store();
let principal_ids;
loop {
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_lock",
LOCK_WAIT_TIME_CORE,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_done",
))
.await
.caused_by(trc::location!())?
.is_none()
{
migrate_queue_v011(server)
.await
.caused_by(trc::location!())?;
migrate_reports(server).await.caused_by(trc::location!())?;
reset_changelog(server).await.caused_by(trc::location!())?;
principal_ids = migrate_principals_v0_11(server)
.await
.caused_by(trc::location!())?;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
} else {
principal_ids = server
.get_document_ids(u32::MAX, Collection::Principal)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration completed by another node.",)
);
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock")
.await
.caused_by(trc::location!())?;
break;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration lock busy, waiting 30 seconds.",)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
}
}
if !principal_ids.is_empty() {
let mut principal_ids = principal_ids.into_iter().collect::<Vec<_>>();
principal_ids.shuffle(&mut rand::rng());
loop {
let mut skipped_principal_ids = Vec::new();
let mut num_migrated = 0;
for principal_id in principal_ids {
let lock_key = format!("migrate_{principal_id}_lock");
let done_key = format!("migrate_{principal_id}_done");
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
lock_key.as_bytes(),
LOCK_WAIT_TIME_ACCOUNT,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
))
.await
.caused_by(trc::location!())?
.is_none()
{
migrate_principal_v0_11(server, principal_id)
.await
.caused_by(trc::location!())?;
num_migrated += 1;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes())
.await
.caused_by(trc::location!())?;
} else {
skipped_principal_ids.push(principal_id);
}
}
if !skipped_principal_ids.is_empty() {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!(
"Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.",
skipped_principal_ids.len()
)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
principal_ids = skipped_principal_ids;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Account migration completed.",)
);
break;
}
}
}
Ok(())
}

View file

@ -0,0 +1,61 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
LOCK_RETRY_TIME, LOCK_WAIT_TIME_CORE, event_v1::migrate_calendar_events_v012,
queue::migrate_queue_v012, tasks::migrate_tasks_v011,
};
use common::{KV_LOCK_HOUSEKEEPER, Server};
use trc::AddContext;
pub(crate) async fn migrate_v0_12(server: &Server, migrate_tasks: bool) -> trc::Result<()> {
let force_lock = std::env::var("FORCE_LOCK").is_ok();
let in_memory = server.in_memory_store();
loop {
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_lock",
LOCK_WAIT_TIME_CORE,
)
.await
.caused_by(trc::location!())?
{
migrate_queue_v012(server)
.await
.caused_by(trc::location!())?;
if migrate_tasks {
migrate_tasks_v011(server)
.await
.caused_by(trc::location!())?;
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock")
.await
.caused_by(trc::location!())?;
break;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration lock busy, waiting 30 seconds.",)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
}
}
if migrate_tasks {
migrate_calendar_events_v012(server)
.await
.caused_by(trc::location!())
} else {
Ok(())
}
}

View file

@ -0,0 +1,168 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
LOCK_RETRY_TIME, LOCK_WAIT_TIME_ACCOUNT, LOCK_WAIT_TIME_CORE,
principal_v2::{migrate_principal_v0_13, migrate_principals_v0_13},
};
use common::{KV_LOCK_HOUSEKEEPER, Server};
use store::{
dispatch::lookup::KeyValue,
rand::{self, seq::SliceRandom},
};
use trc::AddContext;
use types::collection::Collection;
pub(crate) async fn migrate_v0_13(server: &Server) -> trc::Result<()> {
let force_lock = std::env::var("FORCE_LOCK").is_ok();
let in_memory = server.in_memory_store();
let principal_ids;
loop {
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_lock",
LOCK_WAIT_TIME_CORE,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
b"migrate_core_done",
))
.await
.caused_by(trc::location!())?
.is_none()
{
principal_ids = migrate_principals_v0_13(server)
.await
.caused_by(trc::location!())?;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(KV_LOCK_HOUSEKEEPER, b"migrate_core_done"),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
} else {
principal_ids = server
.get_document_ids(u32::MAX, Collection::Principal)
.await
.caused_by(trc::location!())?
.unwrap_or_default();
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration completed by another node.",)
);
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, b"migrate_core_lock")
.await
.caused_by(trc::location!())?;
break;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Migration lock busy, waiting 30 seconds.",)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
}
}
if !principal_ids.is_empty() {
let mut principal_ids = principal_ids.into_iter().collect::<Vec<_>>();
principal_ids.shuffle(&mut rand::rng());
loop {
let mut skipped_principal_ids = Vec::new();
let mut num_migrated = 0;
for principal_id in principal_ids {
let lock_key = format!("migrate_{principal_id}_lock");
let done_key = format!("migrate_{principal_id}_done");
if force_lock
|| in_memory
.try_lock(
KV_LOCK_HOUSEKEEPER,
lock_key.as_bytes(),
LOCK_WAIT_TIME_ACCOUNT,
)
.await
.caused_by(trc::location!())?
{
if in_memory
.key_get::<()>(KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
))
.await
.caused_by(trc::location!())?
.is_none()
{
migrate_principal_v0_13(server, principal_id)
.await
.caused_by(trc::location!())?;
num_migrated += 1;
in_memory
.key_set(
KeyValue::new(
KeyValue::<()>::build_key(
KV_LOCK_HOUSEKEEPER,
done_key.as_bytes(),
),
b"1".to_vec(),
)
.expires(86400),
)
.await
.caused_by(trc::location!())?;
}
in_memory
.remove_lock(KV_LOCK_HOUSEKEEPER, lock_key.as_bytes())
.await
.caused_by(trc::location!())?;
} else {
skipped_principal_ids.push(principal_id);
}
}
if !skipped_principal_ids.is_empty() {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!(
"Migrated {num_migrated} accounts and {} are locked by another node, waiting 30 seconds.",
skipped_principal_ids.len()
)
);
tokio::time::sleep(LOCK_RETRY_TIME).await;
principal_ids = skipped_principal_ids;
} else {
trc::event!(
Server(trc::ServerEvent::Startup),
Details = format!("Account migration completed.",)
);
break;
}
}
}
Ok(())
}

View file

@ -234,6 +234,10 @@ impl From<EmailSubmissionField> for Field {
impl Field { impl Field {
pub const ARCHIVE: Field = Field(ARCHIVE_FIELD); pub const ARCHIVE: Field = Field(ARCHIVE_FIELD);
pub fn new(value: u8) -> Self {
Field(value)
}
} }
impl FieldType for Field {} impl FieldType for Field {}

View file

@ -80,7 +80,7 @@ async fn jmap_tests() {
server::webhooks::test(&mut params).await; server::webhooks::test(&mut params).await;
mail::get::test(&mut params).await; /*mail::get::test(&mut params).await;
mail::set::test(&mut params).await; mail::set::test(&mut params).await;
mail::parse::test(&mut params).await; mail::parse::test(&mut params).await;
mail::query::test(&mut params, delete).await; mail::query::test(&mut params, delete).await;
@ -105,7 +105,7 @@ async fn jmap_tests() {
auth::limits::test(&mut params).await; auth::limits::test(&mut params).await;
auth::oauth::test(&mut params).await; auth::oauth::test(&mut params).await;
auth::quota::test(&mut params).await; auth::quota::test(&mut params).await;*/
auth::permissions::test(&params).await; auth::permissions::test(&params).await;
contacts::addressbook::test(&mut params).await; contacts::addressbook::test(&mut params).await;