From 227d47a1c3fc763a32d5fa34383a0ec69365551a Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 8 Jun 2025 11:45:04 +0200 Subject: [PATCH] Renew old/expired FDB read transactions after the 1007 error code is received rather than estimating expiration time --- crates/store/src/backend/foundationdb/mod.rs | 28 +---- crates/store/src/backend/foundationdb/read.rs | 103 +++++++++--------- 2 files changed, 53 insertions(+), 78 deletions(-) diff --git a/crates/store/src/backend/foundationdb/mod.rs b/crates/store/src/backend/foundationdb/mod.rs index 0416ebdd..430d675f 100644 --- a/crates/store/src/backend/foundationdb/mod.rs +++ b/crates/store/src/backend/foundationdb/mod.rs @@ -4,10 +4,9 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ +use foundationdb::{Database, FdbError, api::NetworkAutoStop}; use std::time::{Duration, Instant}; -use foundationdb::{Database, FdbError, Transaction, api::NetworkAutoStop}; - pub mod blob; pub mod main; pub mod read; @@ -15,7 +14,6 @@ pub mod write; const MAX_VALUE_SIZE: usize = 100000; pub const TRANSACTION_EXPIRY: Duration = Duration::from_secs(1); -pub const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(4); #[allow(dead_code)] pub struct FdbStore { @@ -24,11 +22,6 @@ pub struct FdbStore { version: parking_lot::Mutex, } -pub(crate) struct TimedTransaction { - trx: Transaction, - expires: Instant, -} - pub(crate) struct ReadVersion { version: i64, expires: Instant, @@ -56,25 +49,6 @@ impl Default for ReadVersion { } } -impl AsRef for TimedTransaction { - fn as_ref(&self) -> &Transaction { - &self.trx - } -} - -impl TimedTransaction { - pub fn new(trx: Transaction) -> Self { - Self { - trx, - expires: Instant::now() + TRANSACTION_TIMEOUT, - } - } - - pub fn is_expired(&self) -> bool { - self.expires < Instant::now() - } -} - #[inline(always)] fn into_error(error: FdbError) -> trc::Error { trc::StoreEvent::FoundationdbError diff --git a/crates/store/src/backend/foundationdb/read.rs b/crates/store/src/backend/foundationdb/read.rs index f2fdc95d..53644d19 100644 --- a/crates/store/src/backend/foundationdb/read.rs +++ b/crates/store/src/backend/foundationdb/read.rs @@ -4,14 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ -use foundationdb::{ - KeySelector, RangeOption, Transaction, - future::FdbSlice, - options::{self, StreamingMode}, -}; -use futures::TryStreamExt; -use roaring::RoaringBitmap; - +use super::{FdbStore, MAX_VALUE_SIZE, ReadVersion, into_error}; use crate::{ BitmapKey, Deserialize, IterateParams, Key, U32_LEN, ValueKey, WITH_SUBSPACE, backend::deserialize_i64_le, @@ -20,8 +13,13 @@ use crate::{ key::{DeserializeBigEndian, KeySerializer}, }, }; - -use super::{FdbStore, MAX_VALUE_SIZE, ReadVersion, TimedTransaction, into_error}; +use foundationdb::{ + KeySelector, RangeOption, Transaction, + future::FdbSlice, + options::{self, StreamingMode}, +}; +use futures::TryStreamExt; +use roaring::RoaringBitmap; #[allow(dead_code)] pub(crate) enum ChunkedValue { @@ -81,51 +79,61 @@ impl FdbStore { params: IterateParams, mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result + Sync + Send, ) -> trc::Result<()> { - let mut begin = params.begin.serialize(WITH_SUBSPACE); + let begin = params.begin.serialize(WITH_SUBSPACE); let end = params.end.serialize(WITH_SUBSPACE); if !params.first { - let mut begin_selector = KeySelector::first_greater_or_equal(&begin); + let mut last_key = vec![]; - loop { - let mut last_key_bytes = None; + 'outer: loop { + let begin_selector = if last_key.is_empty() { + KeySelector::first_greater_or_equal(&begin) + } else { + KeySelector::first_greater_than(&last_key) + }; - { - let trx = self.timed_read_trx().await?; - let mut values = trx.as_ref().get_ranges( - RangeOption { - begin: begin_selector, - end: KeySelector::first_greater_than(&end), - mode: options::StreamingMode::WantAll, - reverse: !params.ascending, - ..Default::default() - }, - true, - ); + let trx = self.read_trx().await?; + let mut values = trx.get_ranges( + RangeOption { + begin: begin_selector, + end: KeySelector::first_greater_than(&end), + mode: options::StreamingMode::WantAll, + reverse: !params.ascending, + ..Default::default() + }, + true, + ); - while let Some(values) = values.try_next().await.map_err(into_error)? { - let mut last_key = &[] as &[u8]; - - for value in values.iter() { - last_key = value.key(); - if !cb(last_key.get(1..).unwrap_or_default(), value.value())? { - return Ok(()); + let mut last_key_ = vec![]; + loop { + match values.try_next().await { + Ok(Some(values)) => { + let mut key = &[] as &[u8]; + for value in values.iter() { + key = value.key(); + if !cb(key.get(1..).unwrap_or_default(), value.value())? { + return Ok(()); + } + } + if values.more() { + last_key_ = key.to_vec(); } } - - if values.more() && trx.is_expired() { - last_key_bytes = last_key.to_vec().into(); - break; + Ok(None) => { + break 'outer; + } + Err(e) => { + if e.code() == 1007 && !last_key_.is_empty() { + // Transaction is too old to perform reads or be committed + drop(values); + last_key = last_key_; + continue 'outer; + } else { + return Err(into_error(e)); + } } } } - - if let Some(last_key_bytes) = last_key_bytes { - begin = last_key_bytes; - begin_selector = KeySelector::first_greater_than(&begin); - } else { - break; - } } } else { let trx = self.read_trx().await?; @@ -182,13 +190,6 @@ impl FdbStore { Ok(trx) } - - pub(crate) async fn timed_read_trx(&self) -> trc::Result { - self.db - .create_trx() - .map_err(into_error) - .map(TimedTransaction::new) - } } pub(crate) async fn read_chunked_value(