Retry logic for SQL read replicas
Some checks failed
trivy / Check (push) Failing after -7m47s

This commit is contained in:
mdecimus 2024-08-13 17:17:38 +02:00
parent 6b92961c36
commit be656ddd0c
3 changed files with 174 additions and 21 deletions

View file

@ -9,6 +9,7 @@
*/
use std::{
future::Future,
ops::Range,
sync::atomic::{AtomicUsize, Ordering},
};
@ -37,7 +38,15 @@ impl SQLReadReplica {
.collect::<Vec<_>>();
let primary = if let Some(store) = stores.stores.get(&primary_id) {
store.clone()
if store.is_pg_or_mysql() {
store.clone()
} else {
config.new_build_error(
(&prefix, "primary"),
"Primary store must be a PostgreSQL or MySQL store",
);
return None;
}
} else {
config.new_build_error(
(&prefix, "primary"),
@ -48,7 +57,15 @@ impl SQLReadReplica {
let mut replicas = Vec::with_capacity(replica_ids.len());
for replica_id in replica_ids {
if let Some(store) = stores.stores.get(&replica_id) {
replicas.push(store.clone());
if store.is_pg_or_mysql() {
replicas.push(store.clone());
} else {
config.new_build_error(
(&prefix, "replicas"),
"Replica store must be a PostgreSQL or MySQL store",
);
return None;
}
} else {
config.new_build_error(
(&prefix, "replicas"),
@ -69,61 +86,186 @@ impl SQLReadReplica {
}
}
#[inline(always)]
fn replica(&self) -> &Store {
&self.replicas[self.last_used_replica.fetch_add(1, Ordering::Relaxed) % self.replicas.len()]
async fn run_op<'x, F, T, R>(&'x self, f: F) -> trc::Result<T>
where
F: Fn(&'x Store) -> R,
R: Future<Output = trc::Result<T>>,
T: 'static,
{
let mut last_error = None;
for store in [
&self.replicas
[self.last_used_replica.fetch_add(1, Ordering::Relaxed) % self.replicas.len()],
&self.primary,
] {
match f(store).await {
Ok(result) => return Ok(result),
Err(err) => {
if err.is_assertion_failure() {
return Err(err);
} else {
last_error = Some(err);
}
}
}
}
Err(last_error.unwrap())
}
pub async fn get_blob(&self, key: &[u8], range: Range<usize>) -> trc::Result<Option<Vec<u8>>> {
Box::pin(self.replica().get_blob(key, range)).await
self.run_op(move |store| {
let range = range.clone();
async move {
match store {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.get_blob(key, range).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.get_blob(key, range).await,
_ => panic!("Invalid store type"),
}
}
})
.await
}
pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> trc::Result<()> {
Box::pin(self.primary.put_blob(key, data)).await
match &self.primary {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.put_blob(key, data).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.put_blob(key, data).await,
_ => panic!("Invalid store type"),
}
}
pub async fn delete_blob(&self, key: &[u8]) -> trc::Result<bool> {
Box::pin(self.primary.delete_blob(key)).await
match &self.primary {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.delete_blob(key).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.delete_blob(key).await,
_ => panic!("Invalid store type"),
}
}
pub async fn get_value<U>(&self, key: impl Key) -> trc::Result<Option<U>>
where
U: Deserialize + 'static,
{
Box::pin(self.replica().get_value(key)).await
self.run_op(move |store| {
let key = key.clone();
async move {
match store {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.get_value(key).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.get_value(key).await,
_ => panic!("Invalid store type"),
}
}
})
.await
}
pub async fn get_bitmap(
&self,
key: BitmapKey<BitmapClass<u32>>,
) -> trc::Result<Option<RoaringBitmap>> {
Box::pin(self.replica().get_bitmap(key)).await
self.run_op(move |store| {
let key = key.clone();
async move {
match store {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.get_bitmap(key).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.get_bitmap(key).await,
_ => panic!("Invalid store type"),
}
}
})
.await
}
pub async fn iterate<T: Key>(
&self,
params: IterateParams<T>,
cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send,
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> trc::Result<bool> + Sync + Send,
) -> trc::Result<()> {
Box::pin(self.replica().iterate(params, cb)).await
let mut last_error = None;
for store in [
&self.replicas
[self.last_used_replica.fetch_add(1, Ordering::Relaxed) % self.replicas.len()],
&self.primary,
] {
match match store {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.iterate(params.clone(), &mut cb).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.iterate(params.clone(), &mut cb).await,
_ => panic!("Invalid store type"),
} {
Ok(result) => return Ok(result),
Err(err) => {
last_error = Some(err);
}
}
}
Err(last_error.unwrap())
}
pub async fn get_counter(
&self,
key: impl Into<ValueKey<ValueClass<u32>>> + Sync + Send,
) -> trc::Result<i64> {
Box::pin(self.replica().get_counter(key)).await
let key = key.into();
self.run_op(move |store| {
let key = key.clone();
async move {
match store {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.get_counter(key).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.get_counter(key).await,
_ => panic!("Invalid store type"),
}
}
})
.await
}
pub async fn write(&self, batch: Batch) -> trc::Result<AssignedIds> {
Box::pin(self.primary.write(batch)).await
match &self.primary {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.write(batch).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.write(batch).await,
_ => panic!("Invalid store type"),
}
}
pub async fn delete_range(&self, from: impl Key, to: impl Key) -> trc::Result<()> {
Box::pin(self.primary.delete_range(from, to)).await
match &self.primary {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.delete_range(from, to).await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.delete_range(from, to).await,
_ => panic!("Invalid store type"),
}
}
pub async fn purge_store(&self) -> trc::Result<()> {
Box::pin(self.primary.purge_store()).await
match &self.primary {
#[cfg(feature = "postgres")]
Store::PostgreSQL(store) => store.purge_store().await,
#[cfg(feature = "mysql")]
Store::MySQL(store) => store.purge_store().await,
_ => panic!("Invalid store type"),
}
}
}

View file

@ -57,7 +57,7 @@ pub trait Serialize {
// Key serialization flags
pub(crate) const WITH_SUBSPACE: u32 = 1;
pub trait Key: Sync + Send {
pub trait Key: Sync + Send + Clone {
fn serialize(&self, flags: u32) -> Vec<u8>;
fn subspace(&self) -> u8;
}
@ -154,6 +154,7 @@ pub const SUBSPACE_RESERVED_3: u8 = b'x';
pub const SUBSPACE_RESERVED_4: u8 = b'y';
pub const SUBSPACE_RESERVED_5: u8 = b'z';
#[derive(Clone)]
pub struct IterateParams<T: Key> {
begin: T,
end: T,
@ -673,6 +674,16 @@ impl Store {
_ => false,
}
}
pub fn is_pg_or_mysql(&self) -> bool {
match self {
#[cfg(feature = "sqlite")]
Store::SQLite(_) => true,
#[cfg(feature = "postgres")]
Store::PostgreSQL(_) => true,
_ => false,
}
}
}
impl std::fmt::Debug for Store {

View file

@ -208,7 +208,7 @@ impl Key for LogKey {
}
}
impl<T: AsRef<ValueClass<u32>> + Sync + Send> Key for ValueKey<T> {
impl<T: AsRef<ValueClass<u32>> + Sync + Send + Clone> Key for ValueKey<T> {
fn subspace(&self) -> u8 {
self.class.as_ref().subspace(self.collection)
}
@ -371,7 +371,7 @@ impl<T: ResolveId> ValueClass<T> {
}
}
impl<T: AsRef<[u8]> + Sync + Send> Key for IndexKey<T> {
impl<T: AsRef<[u8]> + Sync + Send + Clone> Key for IndexKey<T> {
fn subspace(&self) -> u8 {
SUBSPACE_INDEXES
}
@ -395,7 +395,7 @@ impl<T: AsRef<[u8]> + Sync + Send> Key for IndexKey<T> {
}
}
impl<T: AsRef<BitmapClass<u32>> + Sync + Send> Key for BitmapKey<T> {
impl<T: AsRef<BitmapClass<u32>> + Sync + Send + Clone> Key for BitmapKey<T> {
fn subspace(&self) -> u8 {
self.class.as_ref().subspace()
}
@ -486,7 +486,7 @@ impl<T: ResolveId> BitmapClass<T> {
}
}
impl<T: AsRef<[u8]> + Sync + Send> Key for AnyKey<T> {
impl<T: AsRef<[u8]> + Sync + Send + Clone> Key for AnyKey<T> {
fn serialize(&self, flags: u32) -> Vec<u8> {
let key = self.key.as_ref();
if (flags & WITH_SUBSPACE) != 0 {