Renew old/expired FDB read transactions after the 1007 error code is received rather than estimating expiration time

This commit is contained in:
mdecimus 2025-06-08 11:45:04 +02:00
parent c5d4a65ce3
commit 227d47a1c3
2 changed files with 53 additions and 78 deletions

View file

@ -4,10 +4,9 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use foundationdb::{Database, FdbError, api::NetworkAutoStop};
use std::time::{Duration, Instant};
use foundationdb::{Database, FdbError, Transaction, api::NetworkAutoStop};
pub mod blob;
pub mod main;
pub mod read;
@ -15,7 +14,6 @@ pub mod write;
const MAX_VALUE_SIZE: usize = 100000;
pub const TRANSACTION_EXPIRY: Duration = Duration::from_secs(1);
pub const TRANSACTION_TIMEOUT: Duration = Duration::from_secs(4);
#[allow(dead_code)]
pub struct FdbStore {
@ -24,11 +22,6 @@ pub struct FdbStore {
version: parking_lot::Mutex<ReadVersion>,
}
pub(crate) struct TimedTransaction {
trx: Transaction,
expires: Instant,
}
pub(crate) struct ReadVersion {
version: i64,
expires: Instant,
@ -56,25 +49,6 @@ impl Default for ReadVersion {
}
}
impl AsRef<Transaction> for TimedTransaction {
fn as_ref(&self) -> &Transaction {
&self.trx
}
}
impl TimedTransaction {
pub fn new(trx: Transaction) -> Self {
Self {
trx,
expires: Instant::now() + TRANSACTION_TIMEOUT,
}
}
pub fn is_expired(&self) -> bool {
self.expires < Instant::now()
}
}
#[inline(always)]
fn into_error(error: FdbError) -> trc::Error {
trc::StoreEvent::FoundationdbError

View file

@ -4,14 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use foundationdb::{
KeySelector, RangeOption, Transaction,
future::FdbSlice,
options::{self, StreamingMode},
};
use futures::TryStreamExt;
use roaring::RoaringBitmap;
use super::{FdbStore, MAX_VALUE_SIZE, ReadVersion, into_error};
use crate::{
BitmapKey, Deserialize, IterateParams, Key, U32_LEN, ValueKey, WITH_SUBSPACE,
backend::deserialize_i64_le,
@ -20,8 +13,13 @@ use crate::{
key::{DeserializeBigEndian, KeySerializer},
},
};
use super::{FdbStore, MAX_VALUE_SIZE, ReadVersion, TimedTransaction, into_error};
use foundationdb::{
KeySelector, RangeOption, Transaction,
future::FdbSlice,
options::{self, StreamingMode},
};
use futures::TryStreamExt;
use roaring::RoaringBitmap;
#[allow(dead_code)]
pub(crate) enum ChunkedValue {
@ -81,51 +79,61 @@ impl FdbStore {
params: IterateParams<T>,
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send,
) -> trc::Result<()> {
let mut begin = params.begin.serialize(WITH_SUBSPACE);
let begin = params.begin.serialize(WITH_SUBSPACE);
let end = params.end.serialize(WITH_SUBSPACE);
if !params.first {
let mut begin_selector = KeySelector::first_greater_or_equal(&begin);
let mut last_key = vec![];
loop {
let mut last_key_bytes = None;
'outer: loop {
let begin_selector = if last_key.is_empty() {
KeySelector::first_greater_or_equal(&begin)
} else {
KeySelector::first_greater_than(&last_key)
};
{
let trx = self.timed_read_trx().await?;
let mut values = trx.as_ref().get_ranges(
RangeOption {
begin: begin_selector,
end: KeySelector::first_greater_than(&end),
mode: options::StreamingMode::WantAll,
reverse: !params.ascending,
..Default::default()
},
true,
);
let trx = self.read_trx().await?;
let mut values = trx.get_ranges(
RangeOption {
begin: begin_selector,
end: KeySelector::first_greater_than(&end),
mode: options::StreamingMode::WantAll,
reverse: !params.ascending,
..Default::default()
},
true,
);
while let Some(values) = values.try_next().await.map_err(into_error)? {
let mut last_key = &[] as &[u8];
for value in values.iter() {
last_key = value.key();
if !cb(last_key.get(1..).unwrap_or_default(), value.value())? {
return Ok(());
let mut last_key_ = vec![];
loop {
match values.try_next().await {
Ok(Some(values)) => {
let mut key = &[] as &[u8];
for value in values.iter() {
key = value.key();
if !cb(key.get(1..).unwrap_or_default(), value.value())? {
return Ok(());
}
}
if values.more() {
last_key_ = key.to_vec();
}
}
if values.more() && trx.is_expired() {
last_key_bytes = last_key.to_vec().into();
break;
Ok(None) => {
break 'outer;
}
Err(e) => {
if e.code() == 1007 && !last_key_.is_empty() {
// Transaction is too old to perform reads or be committed
drop(values);
last_key = last_key_;
continue 'outer;
} else {
return Err(into_error(e));
}
}
}
}
if let Some(last_key_bytes) = last_key_bytes {
begin = last_key_bytes;
begin_selector = KeySelector::first_greater_than(&begin);
} else {
break;
}
}
} else {
let trx = self.read_trx().await?;
@ -182,13 +190,6 @@ impl FdbStore {
Ok(trx)
}
pub(crate) async fn timed_read_trx(&self) -> trc::Result<TimedTransaction> {
self.db
.create_trx()
.map_err(into_error)
.map(TimedTransaction::new)
}
}
pub(crate) async fn read_chunked_value(