Refresh old FoundationDB read transactions (closes #520)

This commit is contained in:
mdecimus 2024-07-05 20:09:17 +02:00
parent a9cffb8d1e
commit b53c3d120e
4 changed files with 178 additions and 25 deletions

View file

@ -6,7 +6,7 @@
use std::time::{Duration, Instant};
use foundationdb::{api::NetworkAutoStop, Database, FdbError};
use foundationdb::{api::NetworkAutoStop, Database, FdbError, Transaction};
use crate::Error;
@ -16,6 +16,8 @@ pub mod read;
pub mod write;
const MAX_VALUE_SIZE: usize = 100000;
pub const TRANSACTION_EXPIRY: Duration = Duration::from_secs(1);
pub const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(4);
#[allow(dead_code)]
pub struct FdbStore {
@ -24,6 +26,11 @@ pub struct FdbStore {
version: parking_lot::Mutex<ReadVersion>,
}
pub(crate) struct TimedTransaction {
trx: Transaction,
expires: Instant,
}
pub(crate) struct ReadVersion {
version: i64,
expires: Instant,
@ -33,7 +40,7 @@ impl ReadVersion {
pub fn new(version: i64) -> Self {
Self {
version,
expires: Instant::now() + Duration::from_secs(1),
expires: Instant::now() + TRANSACTION_EXPIRY,
}
}
@ -51,6 +58,25 @@ impl Default for ReadVersion {
}
}
impl AsRef<Transaction> for TimedTransaction {
fn as_ref(&self) -> &Transaction {
&self.trx
}
}
impl TimedTransaction {
pub fn new(trx: Transaction) -> Self {
Self {
trx,
expires: Instant::now() + TRANSACTION_TIMEOUT,
}
}
pub fn is_expired(&self) -> bool {
self.expires < Instant::now()
}
}
impl From<FdbError> for Error {
fn from(error: FdbError) -> Self {
Self::InternalError(format!("FoundationDB error: {}", error.message()))

View file

@ -21,7 +21,7 @@ use crate::{
BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, WITH_SUBSPACE,
};
use super::{FdbStore, ReadVersion, MAX_VALUE_SIZE};
use super::{FdbStore, ReadVersion, TimedTransaction, MAX_VALUE_SIZE};
#[allow(dead_code)]
pub(crate) enum ChunkedValue {
@ -81,31 +81,69 @@ impl FdbStore {
params: IterateParams<T>,
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
let begin = params.begin.serialize(WITH_SUBSPACE);
let mut begin = params.begin.serialize(WITH_SUBSPACE);
let end = params.end.serialize(WITH_SUBSPACE);
let trx = self.read_trx().await?;
let mut values = trx.get_ranges_keyvalues(
RangeOption {
begin: KeySelector::first_greater_or_equal(&begin),
end: KeySelector::first_greater_than(&end),
mode: if params.first {
options::StreamingMode::Small
if !params.first {
let mut has_more = true;
let mut begin_selector = KeySelector::first_greater_or_equal(&begin);
while has_more {
let mut last_key_bytes = None;
{
let trx = self.timed_read_trx().await?;
let mut values = trx.as_ref().get_ranges(
RangeOption {
begin: begin_selector,
end: KeySelector::first_greater_than(&end),
mode: options::StreamingMode::WantAll,
reverse: !params.ascending,
..Default::default()
},
true,
);
while let Some(values) = values.try_next().await? {
has_more = values.more();
let mut last_key = &[] as &[u8];
for value in values.iter() {
last_key = value.key();
if !cb(last_key.get(1..).unwrap_or_default(), value.value())? {
return Ok(());
}
}
if has_more && trx.is_expired() {
last_key_bytes = last_key.to_vec().into();
break;
}
}
}
if let Some(last_key_bytes) = last_key_bytes {
begin = last_key_bytes;
begin_selector = KeySelector::first_greater_than(&begin);
} else {
options::StreamingMode::WantAll
break;
}
}
} else {
let trx = self.read_trx().await?;
let mut values = trx.get_ranges_keyvalues(
RangeOption {
begin: KeySelector::first_greater_or_equal(&begin),
end: KeySelector::first_greater_than(&end),
mode: options::StreamingMode::Small,
reverse: !params.ascending,
..Default::default()
},
reverse: !params.ascending,
..Default::default()
},
true,
);
true,
);
while let Some(value) = values.try_next().await? {
let key = value.key().get(1..).unwrap_or_default();
let value = value.value();
if !cb(key, value)? || params.first {
return Ok(());
if let Some(value) = values.try_next().await? {
cb(value.key().get(1..).unwrap_or_default(), value.value())?;
}
}
@ -140,6 +178,13 @@ impl FdbStore {
Ok(trx)
}
pub(crate) async fn timed_read_trx(&self) -> crate::Result<TimedTransaction> {
self.db
.create_trx()
.map_err(Into::into)
.map(TimedTransaction::new)
}
}
pub(crate) async fn read_chunked_value(

View file

@ -160,6 +160,7 @@ pub async fn test() {
pop3.send("DELE 2").await;
pop3.assert_read(ResponseType::Ok).await;
pop3.send("QUIT").await;
pop3.assert_read(ResponseType::Ok).await;
let mut pop3 = Pop3Connection::connect_and_login().await;
pop3.send("STAT").await;
pop3.assert_read(ResponseType::Ok)
@ -179,6 +180,7 @@ pub async fn test() {
pop3.assert_read(ResponseType::Ok).await;
pop3.assert_read(ResponseType::Ok).await;
pop3.send("QUIT").await;
pop3.assert_read(ResponseType::Ok).await;
let mut pop3 = Pop3Connection::connect_and_login().await;
pop3.send("STAT").await;
pop3.assert_read(ResponseType::Ok)

View file

@ -4,20 +4,100 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::collections::HashSet;
use std::{collections::HashSet, time::Duration};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
write::{
BatchBuilder, BitmapClass, DirectoryClass, MaybeDynamicId, TagValue, ValueClass, F_CLEAR,
},
BitmapKey, Store, ValueKey,
BitmapKey, IterateParams, Store, ValueKey,
};
// FDB max value
const MAX_VALUE_SIZE: usize = 100000;
pub async fn test(db: Store) {
#[cfg(feature = "foundationdb")]
if matches!(db, Store::FoundationDb(_)) && std::env::var("SLOW_FDB_TRX").is_ok() {
println!("Running slow FoundationDB transaction tests...");
// Create 900000 keys
let mut batch = BatchBuilder::new();
batch
.with_account_id(0)
.with_collection(0)
.update_document(0);
for n in 0..900000 {
batch.set(
ValueClass::Config(format!("key{n:10}").into_bytes()),
format!("value{n:10}").into_bytes(),
);
if n % 10000 == 0 {
db.write(batch.build_batch()).await.unwrap();
batch = BatchBuilder::new();
batch
.with_account_id(0)
.with_collection(0)
.update_document(0);
}
}
db.write(batch.build_batch()).await.unwrap();
println!("Created 900.000 keys...");
// Iterate over all keys
let mut n = 0;
db.iterate(
IterateParams::new(
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Config(b"".to_vec()),
},
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Config(b"\xFF".to_vec()),
},
),
|key, value| {
assert_eq!(std::str::from_utf8(key).unwrap(), format!("key{n:10}"));
assert_eq!(std::str::from_utf8(value).unwrap(), format!("value{n:10}"));
n += 1;
if n % 10000 == 0 {
println!("Iterated over {n} keys");
std::thread::sleep(Duration::from_millis(1000));
}
Ok(true)
},
)
.await
.unwrap();
// Delete 100 keys
let mut batch = BatchBuilder::new();
batch
.with_account_id(0)
.with_collection(0)
.update_document(0);
for n in 0..900000 {
batch.clear(ValueClass::Config(format!("key{n:10}").into_bytes()));
if n % 10000 == 0 {
db.write(batch.build_batch()).await.unwrap();
batch = BatchBuilder::new();
batch
.with_account_id(0)
.with_collection(0)
.update_document(0);
}
}
db.write(batch.build_batch()).await.unwrap();
}
// Testing ID assignment
println!("Running dynamic ID assignment tests...");
let mut builder = BatchBuilder::new();