diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs index d5fd11f3..bd5c82f9 100644 --- a/crates/store/src/backend/foundationdb/mod.rs +++ b/crates/store/src/backend/foundationdb/mod.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; -use foundationdb::{api::NetworkAutoStop, Database, FdbError}; +use foundationdb::{api::NetworkAutoStop, Database, FdbError, Transaction}; use crate::Error; @@ -16,6 +16,8 @@ pub mod read; pub mod write; const MAX_VALUE_SIZE: usize = 100000; +pub const TRANSACTION_EXPIRY: Duration = Duration::from_secs(1); +pub const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(4); #[allow(dead_code)] pub struct FdbStore { @@ -24,6 +26,11 @@ pub struct FdbStore { version: parking_lot::Mutex, } +pub(crate) struct TimedTransaction { + trx: Transaction, + expires: Instant, +} + pub(crate) struct ReadVersion { version: i64, expires: Instant, @@ -33,7 +40,7 @@ impl ReadVersion { pub fn new(version: i64) -> Self { Self { version, - expires: Instant::now() + Duration::from_secs(1), + expires: Instant::now() + TRANSACTION_EXPIRY, } } @@ -51,6 +58,25 @@ impl Default for ReadVersion { } } +impl AsRef for TimedTransaction { + fn as_ref(&self) -> &Transaction { + &self.trx + } +} + +impl TimedTransaction { + pub fn new(trx: Transaction) -> Self { + Self { + trx, + expires: Instant::now() + TRANSACTION_TIMEOUT, + } + } + + pub fn is_expired(&self) -> bool { + self.expires < Instant::now() + } +} + impl From for Error { fn from(error: FdbError) -> Self { Self::InternalError(format!("FoundationDB error: {}", error.message())) diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs index 2cfc054d..fcf6e4ae 100644 --- a/crates/store/src/backend/foundationdb/read.rs +++ b/crates/store/src/backend/foundationdb/read.rs @@ -21,7 +21,7 @@ use crate::{ BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, WITH_SUBSPACE, }; -use super::{FdbStore, ReadVersion, MAX_VALUE_SIZE}; +use super::{FdbStore, ReadVersion, TimedTransaction, MAX_VALUE_SIZE}; #[allow(dead_code)] pub(crate) enum ChunkedValue { @@ -81,31 +81,69 @@ impl FdbStore { params: IterateParams, mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result + Sync + Send, ) -> crate::Result<()> { - let begin = params.begin.serialize(WITH_SUBSPACE); + let mut begin = params.begin.serialize(WITH_SUBSPACE); let end = params.end.serialize(WITH_SUBSPACE); - let trx = self.read_trx().await?; - let mut values = trx.get_ranges_keyvalues( - RangeOption { - begin: KeySelector::first_greater_or_equal(&begin), - end: KeySelector::first_greater_than(&end), - mode: if params.first { - options::StreamingMode::Small + if !params.first { + let mut has_more = true; + let mut begin_selector = KeySelector::first_greater_or_equal(&begin); + + while has_more { + let mut last_key_bytes = None; + + { + let trx = self.timed_read_trx().await?; + let mut values = trx.as_ref().get_ranges( + RangeOption { + begin: begin_selector, + end: KeySelector::first_greater_than(&end), + mode: options::StreamingMode::WantAll, + reverse: !params.ascending, + ..Default::default() + }, + true, + ); + + while let Some(values) = values.try_next().await? { + has_more = values.more(); + let mut last_key = &[] as &[u8]; + + for value in values.iter() { + last_key = value.key(); + if !cb(last_key.get(1..).unwrap_or_default(), value.value())? { + return Ok(()); + } + } + + if has_more && trx.is_expired() { + last_key_bytes = last_key.to_vec().into(); + break; + } + } + } + + if let Some(last_key_bytes) = last_key_bytes { + begin = last_key_bytes; + begin_selector = KeySelector::first_greater_than(&begin); } else { - options::StreamingMode::WantAll + break; + } + } + } else { + let trx = self.read_trx().await?; + let mut values = trx.get_ranges_keyvalues( + RangeOption { + begin: KeySelector::first_greater_or_equal(&begin), + end: KeySelector::first_greater_than(&end), + mode: options::StreamingMode::Small, + reverse: !params.ascending, + ..Default::default() }, - reverse: !params.ascending, - ..Default::default() - }, - true, - ); + true, + ); - while let Some(value) = values.try_next().await? { - let key = value.key().get(1..).unwrap_or_default(); - let value = value.value(); - - if !cb(key, value)? || params.first { - return Ok(()); + if let Some(value) = values.try_next().await? { + cb(value.key().get(1..).unwrap_or_default(), value.value())?; } } @@ -140,6 +178,13 @@ impl FdbStore { Ok(trx) } + + pub(crate) async fn timed_read_trx(&self) -> crate::Result { + self.db + .create_trx() + .map_err(Into::into) + .map(TimedTransaction::new) + } } pub(crate) async fn read_chunked_value( diff --git a/tests/src/imap/pop.rs b/tests/src/imap/pop.rs index 489b407e..c1f04652 100644 --- a/tests/src/imap/pop.rs +++ b/tests/src/imap/pop.rs @@ -160,6 +160,7 @@ pub async fn test() { pop3.send("DELE 2").await; pop3.assert_read(ResponseType::Ok).await; pop3.send("QUIT").await; + pop3.assert_read(ResponseType::Ok).await; let mut pop3 = Pop3Connection::connect_and_login().await; pop3.send("STAT").await; pop3.assert_read(ResponseType::Ok) @@ -179,6 +180,7 @@ pub async fn test() { pop3.assert_read(ResponseType::Ok).await; pop3.assert_read(ResponseType::Ok).await; pop3.send("QUIT").await; + pop3.assert_read(ResponseType::Ok).await; let mut pop3 = Pop3Connection::connect_and_login().await; pop3.send("STAT").await; pop3.assert_read(ResponseType::Ok) diff --git a/tests/src/store/ops.rs b/tests/src/store/ops.rs index 0e8b3438..67f9f237 100644 --- a/tests/src/store/ops.rs +++ b/tests/src/store/ops.rs @@ -4,20 +4,100 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use std::collections::HashSet; +use std::{collections::HashSet, time::Duration}; use jmap_proto::types::{collection::Collection, property::Property}; use store::{ write::{ BatchBuilder, BitmapClass, DirectoryClass, MaybeDynamicId, TagValue, ValueClass, F_CLEAR, }, - BitmapKey, Store, ValueKey, + BitmapKey, IterateParams, Store, ValueKey, }; // FDB max value const MAX_VALUE_SIZE: usize = 100000; pub async fn test(db: Store) { + #[cfg(feature = "foundationdb")] + if matches!(db, Store::FoundationDb(_)) && std::env::var("SLOW_FDB_TRX").is_ok() { + println!("Running slow FoundationDB transaction tests..."); + + // Create 900000 keys + let mut batch = BatchBuilder::new(); + batch + .with_account_id(0) + .with_collection(0) + .update_document(0); + for n in 0..900000 { + batch.set( + ValueClass::Config(format!("key{n:10}").into_bytes()), + format!("value{n:10}").into_bytes(), + ); + + if n % 10000 == 0 { + db.write(batch.build_batch()).await.unwrap(); + batch = BatchBuilder::new(); + batch + .with_account_id(0) + .with_collection(0) + .update_document(0); + } + } + db.write(batch.build_batch()).await.unwrap(); + println!("Created 900.000 keys..."); + + // Iterate over all keys + let mut n = 0; + db.iterate( + IterateParams::new( + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Config(b"".to_vec()), + }, + ValueKey { + account_id: 0, + collection: 0, + document_id: 0, + class: ValueClass::Config(b"\xFF".to_vec()), + }, + ), + |key, value| { + assert_eq!(std::str::from_utf8(key).unwrap(), format!("key{n:10}")); + assert_eq!(std::str::from_utf8(value).unwrap(), format!("value{n:10}")); + n += 1; + if n % 10000 == 0 { + println!("Iterated over {n} keys"); + std::thread::sleep(Duration::from_millis(1000)); + } + Ok(true) + }, + ) + .await + .unwrap(); + + // Delete 100 keys + let mut batch = BatchBuilder::new(); + batch + .with_account_id(0) + .with_collection(0) + .update_document(0); + for n in 0..900000 { + batch.clear(ValueClass::Config(format!("key{n:10}").into_bytes())); + + if n % 10000 == 0 { + db.write(batch.build_batch()).await.unwrap(); + batch = BatchBuilder::new(); + batch + .with_account_id(0) + .with_collection(0) + .update_document(0); + } + } + db.write(batch.build_batch()).await.unwrap(); + } + // Testing ID assignment println!("Running dynamic ID assignment tests..."); let mut builder = BatchBuilder::new();