mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-09-28 14:54:20 +08:00
delete_range to use impl Key
This commit is contained in:
parent
4301ec16d0
commit
78afc703f5
8 changed files with 85 additions and 94 deletions
|
@ -459,23 +459,12 @@ impl FdbStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from_key: &[u8],
|
||||
to_key: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
let from_key = KeySerializer::new(from_key.len() + 1)
|
||||
.write(subspace)
|
||||
.write(from_key)
|
||||
.finalize();
|
||||
let to_key = KeySerializer::new(to_key.len() + 1)
|
||||
.write(subspace)
|
||||
.write(to_key)
|
||||
.finalize();
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let from = from.serialize(true);
|
||||
let to = to.serialize(true);
|
||||
|
||||
let trx = self.db.create_trx()?;
|
||||
trx.clear_range(&from_key, &to_key);
|
||||
trx.clear_range(&from, &to);
|
||||
trx.commit()
|
||||
.await
|
||||
.map_err(|err| FdbError::from(err).into())
|
||||
|
|
|
@ -291,21 +291,16 @@ impl MysqlStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from_key: &[u8],
|
||||
to_key: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let mut conn = self.conn_pool.get_conn().await?;
|
||||
|
||||
let s = conn
|
||||
.prep(&format!(
|
||||
"DELETE FROM {} WHERE k >= ? AND k < ?",
|
||||
char::from(subspace),
|
||||
char::from(from.subspace()),
|
||||
))
|
||||
.await?;
|
||||
conn.exec_drop(&s, (&from_key, &to_key))
|
||||
conn.exec_drop(&s, (&from.serialize(false), &to.serialize(false)))
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
|
|
|
@ -306,21 +306,16 @@ impl PostgresStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from_key: &[u8],
|
||||
to_key: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
let s = conn
|
||||
.prepare_cached(&format!(
|
||||
"DELETE FROM {} WHERE k >= $1 AND k < $2",
|
||||
char::from(subspace),
|
||||
char::from(from.subspace()),
|
||||
))
|
||||
.await?;
|
||||
conn.execute(&s, &[&from_key, &to_key])
|
||||
conn.execute(&s, &[&from.serialize(false), &to.serialize(false)])
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(Into::into)
|
||||
|
|
|
@ -225,26 +225,23 @@ impl RocksDbStore {
|
|||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from_key: &[u8],
|
||||
to_key: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let db = self.db.clone();
|
||||
self.spawn_worker(move || {
|
||||
let cf = db
|
||||
.cf_handle(std::str::from_utf8(&[subspace]).unwrap())
|
||||
.cf_handle(std::str::from_utf8(&[from.subspace()]).unwrap())
|
||||
.unwrap();
|
||||
|
||||
// TODO use delete_range when implemented (see https://github.com/rust-rocksdb/rust-rocksdb/issues/839)
|
||||
let from = from.serialize(false);
|
||||
let to = to.serialize(false);
|
||||
let mut delete_keys = Vec::new();
|
||||
let it_mode = IteratorMode::From(from_key, Direction::Forward);
|
||||
let it_mode = IteratorMode::From(&from, Direction::Forward);
|
||||
|
||||
for row in db.iterator_cf(&cf, it_mode) {
|
||||
let (key, _) = row?;
|
||||
|
||||
if key.as_ref() < from_key || key.as_ref() >= to_key {
|
||||
if key.as_ref() < from.as_slice() || key.as_ref() >= to.as_slice() {
|
||||
break;
|
||||
}
|
||||
delete_keys.push(key);
|
||||
|
|
|
@ -204,19 +204,14 @@ impl SqliteStore {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from_key: &[u8],
|
||||
to_key: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let conn = self.conn_pool.get()?;
|
||||
self.spawn_worker(move || {
|
||||
conn.prepare_cached(&format!(
|
||||
"DELETE FROM {} WHERE k >= ? AND k < ?",
|
||||
char::from(subspace),
|
||||
char::from(from.subspace()),
|
||||
))?
|
||||
.execute([from_key, to_key])?;
|
||||
.execute([from.serialize(false), to.serialize(false)])?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
|
|
|
@ -26,7 +26,7 @@ use std::ops::{BitAndAssign, Range};
|
|||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::{
|
||||
write::{key::KeySerializer, Batch, BitmapClass, ValueClass},
|
||||
write::{key::KeySerializer, AnyKey, Batch, BitmapClass, ValueClass},
|
||||
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAPS,
|
||||
SUBSPACE_INDEXES, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN,
|
||||
};
|
||||
|
@ -169,37 +169,39 @@ impl Store {
|
|||
Self::RocksDb(store) => store.purge_bitmaps().await,
|
||||
}
|
||||
}
|
||||
pub(crate) async fn delete_range(
|
||||
&self,
|
||||
subspace: u8,
|
||||
from: &[u8],
|
||||
to: &[u8],
|
||||
) -> crate::Result<()> {
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
match self {
|
||||
#[cfg(feature = "sqlite")]
|
||||
Self::SQLite(store) => store.delete_range(subspace, from, to).await,
|
||||
Self::SQLite(store) => store.delete_range(from, to).await,
|
||||
#[cfg(feature = "foundation")]
|
||||
Self::FoundationDb(store) => store.delete_range(subspace, from, to).await,
|
||||
Self::FoundationDb(store) => store.delete_range(from, to).await,
|
||||
#[cfg(feature = "postgres")]
|
||||
Self::PostgreSQL(store) => store.delete_range(subspace, from, to).await,
|
||||
Self::PostgreSQL(store) => store.delete_range(from, to).await,
|
||||
#[cfg(feature = "mysql")]
|
||||
Self::MySQL(store) => store.delete_range(subspace, from, to).await,
|
||||
Self::MySQL(store) => store.delete_range(from, to).await,
|
||||
#[cfg(feature = "rocks")]
|
||||
Self::RocksDb(store) => store.delete_range(subspace, from, to).await,
|
||||
Self::RocksDb(store) => store.delete_range(from, to).await,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
|
||||
let from_key = KeySerializer::new(U32_LEN).write(account_id).finalize();
|
||||
let to_key = KeySerializer::new(U32_LEN).write(account_id + 1).finalize();
|
||||
|
||||
for subspace in [
|
||||
SUBSPACE_BITMAPS,
|
||||
SUBSPACE_VALUES,
|
||||
SUBSPACE_LOGS,
|
||||
SUBSPACE_INDEXES,
|
||||
] {
|
||||
self.delete_range(subspace, &from_key, &to_key).await?;
|
||||
self.delete_range(
|
||||
AnyKey {
|
||||
subspace,
|
||||
key: KeySerializer::new(U32_LEN).write(account_id).finalize(),
|
||||
},
|
||||
AnyKey {
|
||||
subspace,
|
||||
key: KeySerializer::new(U32_LEN).write(account_id + 1).finalize(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for (from_key, to_key) in [
|
||||
|
@ -209,15 +211,13 @@ impl Store {
|
|||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Acl(account_id),
|
||||
}
|
||||
.serialize(false),
|
||||
},
|
||||
ValueKey {
|
||||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Acl(account_id + 1),
|
||||
}
|
||||
.serialize(false),
|
||||
},
|
||||
),
|
||||
(
|
||||
ValueKey {
|
||||
|
@ -225,19 +225,16 @@ impl Store {
|
|||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::ReservedId,
|
||||
}
|
||||
.serialize(false),
|
||||
},
|
||||
ValueKey {
|
||||
account_id: account_id + 1,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::ReservedId,
|
||||
}
|
||||
.serialize(false),
|
||||
},
|
||||
),
|
||||
] {
|
||||
self.delete_range(SUBSPACE_INDEX_VALUES, &from_key, &to_key)
|
||||
.await?;
|
||||
self.delete_range(from_key, to_key).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -303,9 +300,13 @@ impl Store {
|
|||
SUBSPACE_BLOB_DATA,
|
||||
] {
|
||||
self.delete_range(
|
||||
AnyKey {
|
||||
subspace,
|
||||
&[0u8],
|
||||
&[
|
||||
key: &[0u8],
|
||||
},
|
||||
AnyKey {
|
||||
subspace,
|
||||
key: &[
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
|
@ -314,6 +315,7 @@ impl Store {
|
|||
u8::MAX,
|
||||
u8::MAX,
|
||||
],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
@ -479,7 +481,24 @@ impl Store {
|
|||
}
|
||||
|
||||
// Delete logs
|
||||
self.delete_range(SUBSPACE_LOGS, &[0u8], &[u8::MAX, u8::MAX, u8::MAX, u8::MAX])
|
||||
self.delete_range(
|
||||
AnyKey {
|
||||
subspace: SUBSPACE_LOGS,
|
||||
key: &[0u8],
|
||||
},
|
||||
AnyKey {
|
||||
subspace: SUBSPACE_LOGS,
|
||||
key: &[
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
],
|
||||
},
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
|
|
|
@ -418,14 +418,15 @@ impl<T: AsRef<BlobHash> + Sync + Send> Key for BlobKey<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl Key for AnyKey {
|
||||
impl<T: AsRef<[u8]> + Sync + Send> Key for AnyKey<T> {
|
||||
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
|
||||
let key = self.key.as_ref();
|
||||
if include_subspace {
|
||||
KeySerializer::new(self.key.len() + 1).write(self.subspace)
|
||||
KeySerializer::new(key.len() + 1).write(self.subspace)
|
||||
} else {
|
||||
KeySerializer::new(self.key.len())
|
||||
KeySerializer::new(key.len())
|
||||
}
|
||||
.write(self.key.as_slice())
|
||||
.write(key)
|
||||
.finalize()
|
||||
}
|
||||
|
||||
|
|
|
@ -160,9 +160,9 @@ pub enum BlobOp {
|
|||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
|
||||
pub struct AnyKey {
|
||||
pub struct AnyKey<T: AsRef<[u8]>> {
|
||||
pub subspace: u8,
|
||||
pub key: Vec<u8>,
|
||||
pub key: T,
|
||||
}
|
||||
|
||||
impl From<u32> for TagValue {
|
||||
|
|
Loading…
Add table
Reference in a new issue