igration tool to generate the correct next id (#1561)

This commit is contained in:
mdecimus 2025-05-26 20:42:18 +02:00
parent 1df88433b2
commit 1de410de08
10 changed files with 157 additions and 62 deletions

View file

@ -2,6 +2,19 @@
All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).
## [0.12.1] - 2025-05-26
If you are upgrading from v0.11.x, this version includes **breaking changes** to the database layout and requires a migration. Please read the [UPGRADING.md](UPGRADING.md) file for more information on how to upgrade from previous versions.
## Added
## Changed
## Fixed
- Migration tool to generate the correct next id (#1561).
- Failed to parse setting dav.lock.max-timeout (closes #1559).
- Failed to build OpenTelemetry span exporter: no http client specified (#1571).
## [0.12.0] - 2025-05-26
This version includes **breaking changes** to the database layout and requires a migration. Please read the [UPGRADING.md](UPGRADING.md) file for more information on how to upgrade from previous versions.

View file

@ -106,14 +106,6 @@ pub(crate) async fn migrate_emails(server: &Server, account_id: u32) -> trc::Res
message_data.entry(message_id).or_default().thread_id = thread_id;
}
// Obtain changeIds
/*for (message_id, change_id) in
get_properties::<u64, _, _>(server, account_id, Collection::Email, &(), Property::Cid)
.await.caused_by(trc::location!())?
{
message_data.entry(message_id).or_default().change_id = change_id;
}*/
// Write message data
for (message_id, data) in message_data {
if !tombstoned_ids.contains(message_id) {
@ -145,7 +137,7 @@ pub(crate) async fn migrate_emails(server: &Server, account_id: u32) -> trc::Res
}
// Migrate message metadata
for message_id in message_ids {
for message_id in &message_ids {
match server
.store()
.get_value::<LegacyBincode<LegacyMessageMetadata>>(ValueKey {
@ -304,7 +296,11 @@ pub(crate) async fn migrate_emails(server: &Server, account_id: u32) -> trc::Res
if did_migrate {
server
.store()
.assign_document_ids(account_id, Collection::Email, num_emails + 1)
.assign_document_ids(
account_id,
Collection::Email,
message_ids.max().map(|id| id as u64).unwrap_or(num_emails) + 1,
)
.await
.caused_by(trc::location!())?;
Ok(num_emails)

View file

@ -28,7 +28,7 @@ pub(crate) async fn migrate_identities(server: &Server, account_id: u32) -> trc:
}
let mut did_migrate = false;
for identity_id in identity_ids {
for identity_id in &identity_ids {
match server
.store()
.get_value::<Object<Value>>(ValueKey {
@ -86,7 +86,15 @@ pub(crate) async fn migrate_identities(server: &Server, account_id: u32) -> trc:
if did_migrate {
server
.store()
.assign_document_ids(account_id, Collection::Identity, num_identities + 1)
.assign_document_ids(
account_id,
Collection::Identity,
identity_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_identities)
+ 1,
)
.await
.caused_by(trc::location!())?;
Ok(num_identities)

View file

@ -32,7 +32,7 @@ pub(crate) async fn migrate_mailboxes(server: &Server, account_id: u32) -> trc::
}
let mut did_migrate = false;
for mailbox_id in mailbox_ids {
for mailbox_id in &mailbox_ids {
match server
.store()
.get_value::<Object<Value>>(ValueKey {
@ -114,7 +114,15 @@ pub(crate) async fn migrate_mailboxes(server: &Server, account_id: u32) -> trc::
if did_migrate {
server
.store()
.assign_document_ids(account_id, Collection::Mailbox, num_mailboxes + 1)
.assign_document_ids(
account_id,
Collection::Mailbox,
mailbox_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_mailboxes)
+ 1,
)
.await
.caused_by(trc::location!())?;
Ok(num_mailboxes)

View file

@ -100,7 +100,15 @@ pub(crate) async fn migrate_principals(server: &Server) -> trc::Result<RoaringBi
if num_migrated > 0 {
server
.store()
.assign_document_ids(u32::MAX, Collection::Principal, num_principals + 1)
.assign_document_ids(
u32::MAX,
Collection::Principal,
principal_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_principals)
+ 1,
)
.await
.caused_by(trc::location!())?;

View file

@ -34,7 +34,7 @@ pub(crate) async fn migrate_push_subscriptions(
}
let mut did_migrate = false;
for push_subscription_id in push_subscription_ids {
for push_subscription_id in &push_subscription_ids {
match server
.store()
.get_value::<Object<Value>>(ValueKey {
@ -94,7 +94,11 @@ pub(crate) async fn migrate_push_subscriptions(
.assign_document_ids(
account_id,
Collection::PushSubscription,
num_push_subscriptions + 1,
push_subscription_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_push_subscriptions)
+ 1,
)
.await
.caused_by(trc::location!())?;

View file

@ -55,7 +55,7 @@ pub(crate) async fn migrate_sieve(server: &Server, account_id: u32) -> trc::Resu
.caused_by(trc::location!())?;
}
for script_id in script_ids {
for script_id in &script_ids {
match server
.store()
.get_value::<Object<Value>>(ValueKey {
@ -154,7 +154,11 @@ pub(crate) async fn migrate_sieve(server: &Server, account_id: u32) -> trc::Resu
if did_migrate {
server
.store()
.assign_document_ids(account_id, Collection::SieveScript, num_scripts + 1)
.assign_document_ids(
account_id,
Collection::SieveScript,
script_ids.max().map(|id| id as u64).unwrap_or(num_scripts) + 1,
)
.await
.caused_by(trc::location!())?;
Ok(num_scripts)

View file

@ -62,7 +62,7 @@ pub(crate) async fn migrate_email_submissions(
.caused_by(trc::location!())?;
}
for email_submission_id in email_submission_ids {
for email_submission_id in &email_submission_ids {
match server
.store()
.get_value::<Object<Value>>(ValueKey {
@ -126,7 +126,11 @@ pub(crate) async fn migrate_email_submissions(
.assign_document_ids(
account_id,
Collection::EmailSubmission,
num_email_submissions + 1,
email_submission_ids
.max()
.map(|id| id as u64)
.unwrap_or(num_email_submissions)
+ 1,
)
.await
.caused_by(trc::location!())?;

View file

@ -50,7 +50,11 @@ pub(crate) async fn migrate_threads(server: &Server, account_id: u32) -> trc::Re
// Increment document id counter
server
.store()
.assign_document_ids(account_id, Collection::Thread, num_threads + 1)
.assign_document_ids(
account_id,
Collection::Thread,
thread_ids.max().map(|id| id as u64).unwrap_or(num_threads) + 1,
)
.await
.caused_by(trc::location!())?;

View file

@ -4,18 +4,21 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use rusqlite::{OptionalExtension, TransactionBehavior, params};
use super::{SqliteStore, into_error};
use crate::{
IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA, U64_LEN,
write::{AssignedIds, Batch, BitmapClass, Operation, ValueClass, ValueOp},
};
use super::{SqliteStore, into_error};
use rusqlite::{OptionalExtension, TransactionBehavior, params};
use trc::AddContext;
impl SqliteStore {
pub(crate) async fn write(&self, batch: Batch<'_>) -> trc::Result<AssignedIds> {
let mut conn = self.conn_pool.get().map_err(into_error)?;
let mut conn = self
.conn_pool
.get()
.map_err(into_error)
.caused_by(trc::location!())?;
self.spawn_worker(move || {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
@ -23,7 +26,8 @@ impl SqliteStore {
let mut change_id = 0u64;
let trx = conn
.transaction_with_behavior(TransactionBehavior::Immediate)
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
let mut result = AssignedIds::default();
let has_changes = !batch.changes.is_empty();
@ -36,9 +40,11 @@ impl SqliteStore {
"ON CONFLICT(k) DO UPDATE SET v = v + ",
"excluded.v RETURNING v"
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.query_row(params![&key, &1i64], |row| row.get::<_, i64>(0))
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
result.push_change_id(account_id, change_id as u64);
}
}
@ -81,9 +87,11 @@ impl SqliteStore {
"INSERT OR REPLACE INTO {} (k, v) VALUES (?, ?)",
table
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([&key, value])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
ValueOp::AtomicAdd(by) => {
if *by >= 0 {
@ -94,16 +102,20 @@ impl SqliteStore {
),
table
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute(params![&key, *by])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
} else {
trx.prepare_cached(&format!(
"UPDATE {table} SET v = v + ? WHERE k = ?"
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute(params![*by, &key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
}
ValueOp::AddAndGet(by) => {
@ -116,16 +128,20 @@ impl SqliteStore {
),
table
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.query_row(params![&key, &*by], |row| row.get::<_, i64>(0))
.map_err(into_error)?,
.map_err(into_error)
.caused_by(trc::location!())?,
);
}
ValueOp::Clear => {
trx.prepare_cached(&format!("DELETE FROM {} WHERE k = ?", table))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
}
}
@ -141,14 +157,18 @@ impl SqliteStore {
if *set {
trx.prepare_cached("INSERT OR IGNORE INTO i (k) VALUES (?)")
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
} else {
trx.prepare_cached("DELETE FROM i WHERE k = ?")
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
}
Operation::Bitmap { class, set } => {
@ -159,23 +179,29 @@ impl SqliteStore {
if *set {
if is_document_id {
trx.prepare_cached("INSERT INTO b (k) VALUES (?)")
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute(params![&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
} else {
trx.prepare_cached(&format!(
"INSERT OR IGNORE INTO {} (k) VALUES (?)",
table
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute(params![&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
} else {
trx.prepare_cached(&format!("DELETE FROM {} WHERE k = ?", table))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute(params![&key])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
};
}
Operation::Log { collection, set } => {
@ -187,9 +213,11 @@ impl SqliteStore {
.serialize(0);
trx.prepare_cached("INSERT OR REPLACE INTO l (k, v) VALUES (?, ?)")
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([&key, set])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
Operation::AssertValue {
class,
@ -200,16 +228,22 @@ impl SqliteStore {
let matches = trx
.prepare_cached(&format!("SELECT v FROM {} WHERE k = ?", table))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.query_row([&key], |row| {
Ok(assert_value.matches(row.get_ref(0)?.as_bytes()?))
})
.optional()
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.unwrap_or_else(|| assert_value.is_none());
if !matches {
trx.rollback().map_err(into_error)?;
return Err(trc::StoreEvent::AssertValueFailed.into());
trx.rollback()
.map_err(into_error)
.caused_by(trc::location!())?;
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!()));
}
}
}
@ -221,13 +255,19 @@ impl SqliteStore {
}
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
let conn = self.conn_pool.get().map_err(into_error)?;
let conn = self
.conn_pool
.get()
.map_err(into_error)
.caused_by(trc::location!())?;
self.spawn_worker(move || {
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER] {
conn.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
}
Ok(())
@ -236,15 +276,21 @@ impl SqliteStore {
}
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> trc::Result<()> {
let conn = self.conn_pool.get().map_err(into_error)?;
let conn = self
.conn_pool
.get()
.map_err(into_error)
.caused_by(trc::location!())?;
self.spawn_worker(move || {
conn.prepare_cached(&format!(
"DELETE FROM {} WHERE k >= ? AND k < ?",
char::from(from.subspace()),
))
.map_err(into_error)?
.map_err(into_error)
.caused_by(trc::location!())?
.execute([from.serialize(0), to.serialize(0)])
.map_err(into_error)?;
.map_err(into_error)
.caused_by(trc::location!())?;
Ok(())
})