mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-09-11 06:24:19 +08:00
FDB chunked values support
This commit is contained in:
parent
db564ae0db
commit
166020b45b
12 changed files with 541 additions and 98 deletions
|
@ -32,7 +32,7 @@ jemallocator = "0.5.0"
|
|||
|
||||
[features]
|
||||
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
|
||||
default = ["sqlite", "postgres", "mysql"]
|
||||
default = ["sqlite", "postgres", "mysql", "foundationdb"]
|
||||
sqlite = ["store/sqlite"]
|
||||
foundationdb = ["store/foundation"]
|
||||
postgres = ["store/postgres"]
|
||||
|
|
|
@ -54,6 +54,7 @@ elastic = ["elasticsearch", "serde_json"]
|
|||
mysql = ["mysql_async"]
|
||||
s3 = ["rust-s3"]
|
||||
foundation = ["foundationdb", "futures"]
|
||||
fdb-chunked-bm = []
|
||||
|
||||
test_mode = []
|
||||
|
||||
|
|
|
@ -28,9 +28,7 @@ use futures::StreamExt;
|
|||
|
||||
use crate::{write::key::KeySerializer, Error, BLOB_HASH_LEN, SUBSPACE_BLOB_DATA};
|
||||
|
||||
use super::FdbStore;
|
||||
|
||||
const MAX_BLOCK_SIZE: usize = 100000;
|
||||
use super::{FdbStore, MAX_VALUE_SIZE};
|
||||
|
||||
impl FdbStore {
|
||||
pub(crate) async fn get_blob(
|
||||
|
@ -38,9 +36,9 @@ impl FdbStore {
|
|||
key: &[u8],
|
||||
range: Range<u32>,
|
||||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
let block_start = range.start as usize / MAX_BLOCK_SIZE;
|
||||
let bytes_start = range.start as usize % MAX_BLOCK_SIZE;
|
||||
let block_end = (range.end as usize / MAX_BLOCK_SIZE) + 1;
|
||||
let block_start = range.start as usize / MAX_VALUE_SIZE;
|
||||
let bytes_start = range.start as usize % MAX_VALUE_SIZE;
|
||||
let block_end = (range.end as usize / MAX_VALUE_SIZE) + 1;
|
||||
|
||||
let begin = KeySerializer::new(key.len() + 3)
|
||||
.write(SUBSPACE_BLOB_DATA)
|
||||
|
@ -89,8 +87,8 @@ impl FdbStore {
|
|||
} else {
|
||||
let blob_size = if blob_range <= (5 * (1 << 20)) {
|
||||
blob_range
|
||||
} else if value.len() == MAX_BLOCK_SIZE {
|
||||
MAX_BLOCK_SIZE * 2
|
||||
} else if value.len() == MAX_VALUE_SIZE {
|
||||
MAX_VALUE_SIZE * 2
|
||||
} else {
|
||||
value.len()
|
||||
};
|
||||
|
@ -116,8 +114,19 @@ impl FdbStore {
|
|||
}
|
||||
|
||||
pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
|
||||
for (chunk_pos, chunk_bytes) in data.chunks(MAX_BLOCK_SIZE).enumerate() {
|
||||
let trx = self.db.create_trx()?;
|
||||
const N_CHUNKS: usize = (1 << 5) - 1;
|
||||
let last_chunk = std::cmp::max(
|
||||
(data.len() / MAX_VALUE_SIZE)
|
||||
+ if data.len() % MAX_VALUE_SIZE > 0 {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
},
|
||||
1,
|
||||
) - 1;
|
||||
let mut trx = self.db.create_trx()?;
|
||||
|
||||
for (chunk_pos, chunk_bytes) in data.chunks(MAX_VALUE_SIZE).enumerate() {
|
||||
trx.set(
|
||||
&KeySerializer::new(key.len() + 3)
|
||||
.write(SUBSPACE_BLOB_DATA)
|
||||
|
@ -126,9 +135,16 @@ impl FdbStore {
|
|||
.finalize(),
|
||||
chunk_bytes,
|
||||
);
|
||||
trx.commit()
|
||||
.await
|
||||
.map_err(|err| Error::from(FdbError::from(err)))?;
|
||||
if chunk_pos == last_chunk || (chunk_pos > 0 && chunk_pos % N_CHUNKS == 0) {
|
||||
trx.commit()
|
||||
.await
|
||||
.map_err(|err| Error::from(FdbError::from(err)))?;
|
||||
if chunk_pos < last_chunk {
|
||||
trx = self.db.create_trx()?;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
@ -30,6 +30,8 @@ pub mod main;
|
|||
pub mod read;
|
||||
pub mod write;
|
||||
|
||||
const MAX_VALUE_SIZE: usize = 100000;
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub struct FdbStore {
|
||||
db: Database,
|
||||
|
|
|
@ -22,19 +22,38 @@
|
|||
*/
|
||||
|
||||
use foundationdb::{
|
||||
future::FdbSlice,
|
||||
options::{self, StreamingMode},
|
||||
KeySelector, RangeOption,
|
||||
KeySelector, RangeOption, Transaction,
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::{
|
||||
write::{bitmap::DeserializeBlock, key::DeserializeBigEndian, BitmapClass, ValueClass},
|
||||
write::{
|
||||
bitmap::DeserializeBlock,
|
||||
key::{DeserializeBigEndian, KeySerializer},
|
||||
BitmapClass, ValueClass,
|
||||
},
|
||||
BitmapKey, Deserialize, IterateParams, Key, ValueKey, SUBSPACE_BLOBS, SUBSPACE_INDEXES,
|
||||
U32_LEN,
|
||||
SUBSPACE_VALUES, U32_LEN,
|
||||
};
|
||||
|
||||
use super::FdbStore;
|
||||
use super::{FdbStore, MAX_VALUE_SIZE};
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
pub(crate) enum ChunkedBitmap {
|
||||
Single(RoaringBitmap),
|
||||
Chunked { n_chunks: u8, bitmap: RoaringBitmap },
|
||||
None,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub(crate) enum ChunkedValue {
|
||||
Single(FdbSlice),
|
||||
Chunked { n_chunks: u8, bytes: Vec<u8> },
|
||||
None,
|
||||
}
|
||||
|
||||
impl FdbStore {
|
||||
pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
|
||||
|
@ -44,10 +63,10 @@ impl FdbStore {
|
|||
let key = key.serialize(true);
|
||||
let trx = self.db.create_trx()?;
|
||||
|
||||
if let Some(bytes) = trx.get(&key, true).await? {
|
||||
U::deserialize(&bytes).map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
match read_chunked_value(&key, &trx, true).await? {
|
||||
ChunkedValue::Single(bytes) => U::deserialize(&bytes).map(Some),
|
||||
ChunkedValue::Chunked { bytes, .. } => U::deserialize(&bytes).map(Some),
|
||||
ChunkedValue::None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -55,35 +74,45 @@ impl FdbStore {
|
|||
&self,
|
||||
mut key: BitmapKey<BitmapClass>,
|
||||
) -> crate::Result<Option<RoaringBitmap>> {
|
||||
let mut bm = RoaringBitmap::new();
|
||||
let begin = key.serialize(true);
|
||||
key.block_num = u32::MAX;
|
||||
let end = key.serialize(true);
|
||||
let key_len = begin.len();
|
||||
let trx = self.db.create_trx()?;
|
||||
let mut values = trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(begin),
|
||||
end: KeySelector::first_greater_or_equal(end),
|
||||
mode: StreamingMode::WantAll,
|
||||
reverse: false,
|
||||
..RangeOption::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
{
|
||||
read_chunked_bitmap(&key.serialize(true), &self.db.create_trx()?, true)
|
||||
.await
|
||||
.map(Into::into)
|
||||
}
|
||||
|
||||
while let Some(values) = values.next().await {
|
||||
for value in values? {
|
||||
let key = value.key();
|
||||
if key.len() == key_len {
|
||||
bm.deserialize_block(
|
||||
value.value(),
|
||||
key.deserialize_be_u32(key.len() - U32_LEN)?,
|
||||
);
|
||||
#[cfg(not(feature = "fdb-chunked-bm"))]
|
||||
{
|
||||
let mut bm = RoaringBitmap::new();
|
||||
let begin = key.serialize(true);
|
||||
key.block_num = u32::MAX;
|
||||
let end = key.serialize(true);
|
||||
let key_len = begin.len();
|
||||
let trx = self.db.create_trx()?;
|
||||
let mut values = trx.get_ranges(
|
||||
RangeOption {
|
||||
begin: KeySelector::first_greater_or_equal(begin),
|
||||
end: KeySelector::first_greater_or_equal(end),
|
||||
mode: StreamingMode::WantAll,
|
||||
reverse: false,
|
||||
..RangeOption::default()
|
||||
},
|
||||
true,
|
||||
);
|
||||
|
||||
while let Some(values) = values.next().await {
|
||||
for value in values? {
|
||||
let key = value.key();
|
||||
if key.len() == key_len {
|
||||
bm.deserialize_block(
|
||||
value.value(),
|
||||
key.deserialize_be_u32(key.len() - U32_LEN)?,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(if !bm.is_empty() { Some(bm) } else { None })
|
||||
}
|
||||
Ok(if !bm.is_empty() { Some(bm) } else { None })
|
||||
}
|
||||
|
||||
pub(crate) async fn iterate<T: Key>(
|
||||
|
@ -140,7 +169,7 @@ impl FdbStore {
|
|||
|
||||
#[cfg(feature = "test_mode")]
|
||||
pub(crate) async fn assert_is_empty(&self) {
|
||||
use crate::{SUBSPACE_BITMAPS, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS, SUBSPACE_VALUES};
|
||||
use crate::{SUBSPACE_BITMAPS, SUBSPACE_INDEX_VALUES, SUBSPACE_LOGS};
|
||||
|
||||
let conn = self.db.create_trx().unwrap();
|
||||
|
||||
|
@ -228,3 +257,68 @@ impl FdbStore {
|
|||
trx.commit().await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn read_chunked_value(
|
||||
key: &[u8],
|
||||
trx: &Transaction,
|
||||
snapshot: bool,
|
||||
) -> crate::Result<ChunkedValue> {
|
||||
if let Some(bytes) = trx.get(key, snapshot).await? {
|
||||
if bytes.len() < MAX_VALUE_SIZE {
|
||||
Ok(ChunkedValue::Single(bytes))
|
||||
} else {
|
||||
let mut value = Vec::with_capacity(bytes.len() * 2);
|
||||
value.extend_from_slice(&bytes);
|
||||
let mut key = KeySerializer::new(key.len() + 1)
|
||||
.write(key)
|
||||
.write(0u8)
|
||||
.finalize();
|
||||
|
||||
while let Some(bytes) = trx.get(&key, snapshot).await? {
|
||||
value.extend_from_slice(&bytes);
|
||||
*key.last_mut().unwrap() += 1;
|
||||
}
|
||||
|
||||
Ok(ChunkedValue::Chunked {
|
||||
bytes: value,
|
||||
n_chunks: *key.last().unwrap(),
|
||||
})
|
||||
}
|
||||
} else {
|
||||
Ok(ChunkedValue::None)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
pub(crate) async fn read_chunked_bitmap(
|
||||
key: &[u8],
|
||||
trx: &Transaction,
|
||||
snapshot: bool,
|
||||
) -> crate::Result<ChunkedBitmap> {
|
||||
match read_chunked_value(key, trx, snapshot).await? {
|
||||
ChunkedValue::Single(bytes) => RoaringBitmap::deserialize_unchecked_from(bytes.as_ref())
|
||||
.map(ChunkedBitmap::Single)
|
||||
.map_err(|e| {
|
||||
crate::Error::InternalError(format!("Failed to deserialize bitmap: {}", e))
|
||||
}),
|
||||
ChunkedValue::Chunked { bytes, n_chunks } => {
|
||||
RoaringBitmap::deserialize_unchecked_from(bytes.as_slice())
|
||||
.map(|bitmap| ChunkedBitmap::Chunked { n_chunks, bitmap })
|
||||
.map_err(|e| {
|
||||
crate::Error::InternalError(format!("Failed to deserialize bitmap: {}", e))
|
||||
})
|
||||
}
|
||||
ChunkedValue::None => Ok(ChunkedBitmap::None),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
impl From<ChunkedBitmap> for Option<RoaringBitmap> {
|
||||
fn from(bitmap: ChunkedBitmap) -> Self {
|
||||
match bitmap {
|
||||
ChunkedBitmap::Single(bitmap) => Some(bitmap),
|
||||
ChunkedBitmap::Chunked { bitmap, .. } => Some(bitmap),
|
||||
ChunkedBitmap::None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,7 +21,10 @@
|
|||
* for more details.
|
||||
*/
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use ahash::AHashMap;
|
||||
use foundationdb::{
|
||||
|
@ -37,17 +40,43 @@ use crate::{
|
|||
key::KeySerializer,
|
||||
Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS,
|
||||
BitmapKey, BlobKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_BITMAPS, SUBSPACE_VALUES,
|
||||
};
|
||||
|
||||
use super::FdbStore;
|
||||
use super::{
|
||||
read::{read_chunked_value, ChunkedValue},
|
||||
FdbStore, MAX_VALUE_SIZE,
|
||||
};
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
use super::read::{read_chunked_bitmap, ChunkedBitmap};
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
struct BitmapOp {
|
||||
document_id: u32,
|
||||
set: bool,
|
||||
}
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
impl BitmapOp {
|
||||
fn new(document_id: u32, set: bool) -> Self {
|
||||
Self { document_id, set }
|
||||
}
|
||||
}
|
||||
|
||||
impl FdbStore {
|
||||
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
|
||||
let start = Instant::now();
|
||||
let mut retry_count = 0;
|
||||
#[cfg(not(feature = "fdb-chunked-bm"))]
|
||||
let mut set_bitmaps = AHashMap::new();
|
||||
#[cfg(not(feature = "fdb-chunked-bm"))]
|
||||
let mut clear_bitmaps = AHashMap::new();
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
let mut bitmaps = AHashMap::new();
|
||||
|
||||
loop {
|
||||
let mut account_id = u32::MAX;
|
||||
|
@ -88,16 +117,39 @@ impl FdbStore {
|
|||
trx.atomic_op(&key, &by.to_le_bytes()[..], MutationType::Add);
|
||||
}
|
||||
Operation::Value { class, op } => {
|
||||
let key = ValueKey {
|
||||
let mut key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
}
|
||||
.serialize(true);
|
||||
let do_chunk = key[0] == SUBSPACE_VALUES;
|
||||
|
||||
if let ValueOp::Set(value) = op {
|
||||
trx.set(&key, value);
|
||||
if !value.is_empty() && do_chunk {
|
||||
for (pos, chunk) in value.chunks(MAX_VALUE_SIZE).enumerate() {
|
||||
match pos.cmp(&1) {
|
||||
Ordering::Less => {}
|
||||
Ordering::Equal => {
|
||||
key.push(0);
|
||||
}
|
||||
Ordering::Greater => {
|
||||
if pos < u8::MAX as usize {
|
||||
*key.last_mut().unwrap() += 1;
|
||||
} else {
|
||||
trx.cancel();
|
||||
return Err(crate::Error::InternalError(
|
||||
"Value too large".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
trx.set(&key, chunk);
|
||||
}
|
||||
} else {
|
||||
trx.set(&key, value);
|
||||
}
|
||||
|
||||
if matches!(class, ValueClass::ReservedId) {
|
||||
let block_num = DenseBitmap::block_num(document_id);
|
||||
|
@ -120,6 +172,14 @@ impl FdbStore {
|
|||
}
|
||||
}
|
||||
}
|
||||
} else if do_chunk {
|
||||
trx.clear_range(
|
||||
&key,
|
||||
&KeySerializer::new(key.len() + 1)
|
||||
.write(key.as_slice())
|
||||
.write(u8::MAX)
|
||||
.finalize(),
|
||||
);
|
||||
} else {
|
||||
trx.clear(&key);
|
||||
}
|
||||
|
@ -142,6 +202,7 @@ impl FdbStore {
|
|||
}
|
||||
Operation::Bitmap { class, set } => {
|
||||
if retry_count == 0 {
|
||||
#[cfg(not(feature = "fdb-chunked-bm"))]
|
||||
if *set {
|
||||
&mut set_bitmaps
|
||||
} else {
|
||||
|
@ -158,6 +219,20 @@ impl FdbStore {
|
|||
)
|
||||
.or_insert_with(DenseBitmap::empty)
|
||||
.set(document_id);
|
||||
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
bitmaps
|
||||
.entry(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class,
|
||||
block_num: 0,
|
||||
}
|
||||
.serialize(true),
|
||||
)
|
||||
.or_insert(Vec::new())
|
||||
.push(BitmapOp::new(document_id, *set));
|
||||
}
|
||||
}
|
||||
Operation::Blob { hash, op, set } => {
|
||||
|
@ -201,14 +276,13 @@ impl FdbStore {
|
|||
}
|
||||
.serialize(true);
|
||||
|
||||
let matches = if let Ok(bytes) = trx.get(&key, false).await {
|
||||
if let Some(bytes) = bytes {
|
||||
let matches = match read_chunked_value(&key, &trx, false).await {
|
||||
Ok(ChunkedValue::Single(bytes)) => assert_value.matches(bytes.as_ref()),
|
||||
Ok(ChunkedValue::Chunked { bytes, .. }) => {
|
||||
assert_value.matches(bytes.as_ref())
|
||||
} else {
|
||||
assert_value.is_none()
|
||||
}
|
||||
} else {
|
||||
false
|
||||
Ok(ChunkedValue::None) => assert_value.is_none(),
|
||||
Err(_) => false,
|
||||
};
|
||||
|
||||
if !matches {
|
||||
|
@ -219,12 +293,98 @@ impl FdbStore {
|
|||
}
|
||||
}
|
||||
|
||||
for (key, bitmap) in &set_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitOr);
|
||||
#[cfg(not(feature = "fdb-chunked-bm"))]
|
||||
{
|
||||
for (key, bitmap) in &set_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitOr);
|
||||
}
|
||||
|
||||
for (key, bitmap) in &clear_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitXor);
|
||||
}
|
||||
}
|
||||
|
||||
for (key, bitmap) in &clear_bitmaps {
|
||||
trx.atomic_op(key, &bitmap.bitmap, MutationType::BitXor);
|
||||
// Write bitmaps
|
||||
#[cfg(feature = "fdb-chunked-bm")]
|
||||
for (key, bitmap_ops) in &bitmaps {
|
||||
let (mut bitmap, exists, n_chunks) =
|
||||
match read_chunked_bitmap(key, &trx, false).await? {
|
||||
ChunkedBitmap::Single(bitmap) => (bitmap, true, 0u8),
|
||||
ChunkedBitmap::Chunked { n_chunks, bitmap } => (bitmap, true, n_chunks),
|
||||
ChunkedBitmap::None => (RoaringBitmap::new(), false, 0u8),
|
||||
};
|
||||
|
||||
for bitmap_op in bitmap_ops {
|
||||
if bitmap_op.set {
|
||||
bitmap.insert(bitmap_op.document_id);
|
||||
} else {
|
||||
bitmap.remove(bitmap_op.document_id);
|
||||
}
|
||||
}
|
||||
|
||||
if !bitmap.is_empty() {
|
||||
let mut bytes = Vec::with_capacity(bitmap.serialized_size());
|
||||
bitmap.serialize_into(&mut bytes).map_err(|_| {
|
||||
crate::Error::InternalError("Failed to serialize bitmap".into())
|
||||
})?;
|
||||
let mut key = KeySerializer::new(key.len() + 1)
|
||||
.write(key.as_slice())
|
||||
.finalize();
|
||||
let mut chunk_diff = n_chunks;
|
||||
|
||||
for (pos, chunk) in bytes.chunks(MAX_VALUE_SIZE).enumerate() {
|
||||
match pos.cmp(&1) {
|
||||
Ordering::Less => {}
|
||||
Ordering::Equal => {
|
||||
key.push(0);
|
||||
if n_chunks > 0 {
|
||||
chunk_diff -= 1;
|
||||
}
|
||||
}
|
||||
Ordering::Greater => {
|
||||
if pos < u8::MAX as usize {
|
||||
*key.last_mut().unwrap() += 1;
|
||||
if n_chunks > 0 {
|
||||
chunk_diff -= 1;
|
||||
}
|
||||
} else {
|
||||
trx.cancel();
|
||||
return Err(crate::Error::InternalError(
|
||||
"Bitmap value too large".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
trx.set(&key, chunk);
|
||||
}
|
||||
|
||||
// Delete any additional chunks
|
||||
if chunk_diff > 0 {
|
||||
let mut key = KeySerializer::new(key.len() + 1)
|
||||
.write(key.as_slice())
|
||||
.write(0u8)
|
||||
.finalize();
|
||||
for chunk in (0..n_chunks).rev().take(chunk_diff as usize) {
|
||||
*key.last_mut().unwrap() = chunk;
|
||||
trx.clear(&key);
|
||||
}
|
||||
}
|
||||
} else if exists {
|
||||
// Delete main key
|
||||
trx.clear(key);
|
||||
|
||||
// Delete additional chunked keys
|
||||
if n_chunks > 0 {
|
||||
let mut key = KeySerializer::new(key.len() + 1)
|
||||
.write(key.as_slice())
|
||||
.write(0u8)
|
||||
.finalize();
|
||||
for chunk in 0..n_chunks {
|
||||
*key.last_mut().unwrap() = chunk;
|
||||
trx.clear(&key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match trx.commit().await {
|
||||
|
|
|
@ -236,7 +236,7 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
|
|||
}
|
||||
.write(self.account_id)
|
||||
.write(self.collection)
|
||||
.write_leb128(self.document_id)
|
||||
.write(self.document_id)
|
||||
.write(*field),
|
||||
ValueClass::Acl(grant_account_id) => if include_subspace {
|
||||
KeySerializer::new(U32_LEN * 3 + 3).write(crate::SUBSPACE_INDEX_VALUES)
|
||||
|
@ -262,7 +262,7 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
|
|||
}
|
||||
.write(self.account_id)
|
||||
.write(self.collection)
|
||||
.write_leb128(self.document_id)
|
||||
.write(self.document_id)
|
||||
.write(u8::MAX),
|
||||
ValueClass::ReservedId => if include_subspace {
|
||||
KeySerializer::new(U32_LEN * 2 + 2).write(crate::SUBSPACE_INDEX_VALUES)
|
||||
|
|
|
@ -6,7 +6,7 @@ resolver = "2"
|
|||
|
||||
[features]
|
||||
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
|
||||
default = ["sqlite", "postgres", "mysql"]
|
||||
default = ["sqlite", "postgres", "mysql", "foundationdb"]
|
||||
sqlite = ["store/sqlite"]
|
||||
foundationdb = ["store/foundation"]
|
||||
postgres = ["store/postgres"]
|
||||
|
|
|
@ -172,9 +172,9 @@ private-key = "file://{PK}"
|
|||
directory = "auth"
|
||||
|
||||
[jmap.store]
|
||||
data = "sqlite"
|
||||
fts = "sqlite"
|
||||
blob = "sqlite"
|
||||
data = "{STORE}"
|
||||
fts = "{STORE}"
|
||||
blob = "{STORE}"
|
||||
|
||||
[jmap.protocol]
|
||||
set.max-objects = 100000
|
||||
|
@ -262,11 +262,13 @@ pub struct IMAPTest {
|
|||
shutdown_tx: watch::Sender<bool>,
|
||||
}
|
||||
|
||||
async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
|
||||
async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
|
||||
// Load and parse config
|
||||
let temp_dir = TempDir::new("imap_tests", delete_if_exists);
|
||||
let config = utils::config::Config::new(
|
||||
&add_test_certs(SERVER).replace("{TMP}", &temp_dir.path.display().to_string()),
|
||||
&add_test_certs(SERVER)
|
||||
.replace("{STORE}", store_id)
|
||||
.replace("{TMP}", &temp_dir.path.display().to_string()),
|
||||
)
|
||||
.unwrap();
|
||||
let servers = config.parse_servers().unwrap();
|
||||
|
@ -347,16 +349,29 @@ async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
|
|||
|
||||
#[tokio::test]
|
||||
pub async fn imap_tests() {
|
||||
/*tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_max_level(tracing::Level::DEBUG)
|
||||
.finish(),
|
||||
)
|
||||
.unwrap();*/
|
||||
if let Ok(level) = std::env::var("LOG") {
|
||||
tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::builder()
|
||||
.parse(
|
||||
format!("smtp={level},imap={level},jmap={level},store={level},utils={level},directory={level}"),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.finish(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
// Prepare settings
|
||||
let delete = true;
|
||||
let handle = init_imap_tests(delete).await;
|
||||
let handle = init_imap_tests(
|
||||
&std::env::var("STORE")
|
||||
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
|
||||
delete,
|
||||
)
|
||||
.await;
|
||||
|
||||
// Connect to IMAP server
|
||||
let mut imap_check = ImapConnection::connect(b"_y ").await;
|
||||
|
|
|
@ -173,9 +173,9 @@ private-key = "file://{PK}"
|
|||
directory = "auth"
|
||||
|
||||
[jmap.store]
|
||||
data = "sqlite"
|
||||
fts = "sqlite"
|
||||
blob = "sqlite"
|
||||
data = "{STORE}"
|
||||
fts = "{STORE}"
|
||||
blob = "{STORE}"
|
||||
|
||||
[jmap.protocol]
|
||||
set.max-objects = 100000
|
||||
|
@ -258,22 +258,28 @@ refresh-token-renew = "2s"
|
|||
|
||||
#[tokio::test]
|
||||
pub async fn jmap_tests() {
|
||||
/*let level = "warn";
|
||||
tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::builder()
|
||||
.parse(
|
||||
format!("smtp={level},imap={level},jmap={level},store={level},utils={level},directory={level}"),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.finish(),
|
||||
)
|
||||
.unwrap();*/
|
||||
if let Ok(level) = std::env::var("LOG") {
|
||||
tracing::subscriber::set_global_default(
|
||||
tracing_subscriber::FmtSubscriber::builder()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::builder()
|
||||
.parse(
|
||||
format!("smtp={level},imap={level},jmap={level},store={level},utils={level},directory={level}"),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.finish(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let delete = true;
|
||||
let mut params = init_jmap_tests(delete).await;
|
||||
let mut params = init_jmap_tests(
|
||||
&std::env::var("STORE")
|
||||
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
|
||||
delete,
|
||||
)
|
||||
.await;
|
||||
email_query::test(&mut params, delete).await;
|
||||
email_get::test(&mut params).await;
|
||||
email_set::test(&mut params).await;
|
||||
|
@ -314,7 +320,7 @@ pub async fn jmap_stress_tests() {
|
|||
)
|
||||
.unwrap();
|
||||
|
||||
let params = init_jmap_tests(true).await;
|
||||
let params = init_jmap_tests("foundationdb", true).await;
|
||||
stress_test::test(params.server.clone(), params.client).await;
|
||||
params.temp_dir.delete();
|
||||
}
|
||||
|
@ -355,11 +361,13 @@ pub async fn assert_is_empty(server: Arc<JMAP>) {
|
|||
.await;
|
||||
}
|
||||
|
||||
async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
|
||||
async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
|
||||
// Load and parse config
|
||||
let temp_dir = TempDir::new("jmap_tests", delete_if_exists);
|
||||
let config = utils::config::Config::new(
|
||||
&add_test_certs(SERVER).replace("{TMP}", &temp_dir.path.display().to_string()),
|
||||
&add_test_certs(SERVER)
|
||||
.replace("{STORE}", store_id)
|
||||
.replace("{TMP}", &temp_dir.path.display().to_string()),
|
||||
)
|
||||
.unwrap();
|
||||
let servers = config.parse_servers().unwrap();
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
|
||||
pub mod assign_id;
|
||||
pub mod blob;
|
||||
pub mod ops;
|
||||
pub mod query;
|
||||
|
||||
use std::io::Read;
|
||||
|
@ -87,6 +88,7 @@ pub async fn store_tests() {
|
|||
if insert {
|
||||
store.destroy().await;
|
||||
}
|
||||
ops::test(store.clone()).await;
|
||||
query::test(store.clone(), FtsStore::Store(store.clone()), insert).await;
|
||||
assign_id::test(store).await;
|
||||
}
|
||||
|
|
145
tests/src/store/ops.rs
Normal file
145
tests/src/store/ops.rs
Normal file
|
@ -0,0 +1,145 @@
|
|||
/*
|
||||
* Copyright (c) 2023, Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use store::{
|
||||
write::{BatchBuilder, ValueClass},
|
||||
Store, ValueKey,
|
||||
};
|
||||
|
||||
// FDB max value
|
||||
const MAX_VALUE_SIZE: usize = 100000;
|
||||
|
||||
pub async fn test(db: Store) {
|
||||
for (test_num, value) in [
|
||||
vec![b'A'; 0],
|
||||
vec![b'A'; 1],
|
||||
vec![b'A'; 100],
|
||||
vec![b'A'; MAX_VALUE_SIZE],
|
||||
vec![b'B'; MAX_VALUE_SIZE + 1],
|
||||
vec![b'C'; MAX_VALUE_SIZE]
|
||||
.into_iter()
|
||||
.chain(vec![b'D'; MAX_VALUE_SIZE])
|
||||
.chain(vec![b'E'; MAX_VALUE_SIZE])
|
||||
.collect::<Vec<_>>(),
|
||||
vec![b'F'; MAX_VALUE_SIZE]
|
||||
.into_iter()
|
||||
.chain(vec![b'G'; MAX_VALUE_SIZE])
|
||||
.chain(vec![b'H'; MAX_VALUE_SIZE + 1])
|
||||
.collect::<Vec<_>>(),
|
||||
]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
{
|
||||
// Write value
|
||||
let test_len = value.len();
|
||||
db.write(
|
||||
BatchBuilder::new()
|
||||
.with_account_id(0)
|
||||
.with_collection(0)
|
||||
.with_account_id(0)
|
||||
.update_document(0)
|
||||
.set(ValueClass::Property(1), value.as_slice())
|
||||
.set(ValueClass::Property(0), "check1")
|
||||
.set(ValueClass::Property(2), "check2")
|
||||
.build_batch(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Fetch value
|
||||
assert_eq!(
|
||||
String::from_utf8(value).unwrap(),
|
||||
db.get_value::<String>(ValueKey {
|
||||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Property(1),
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap_or_else(|| panic!("no value for test {test_num} with value length {test_len}")),
|
||||
"failed for test {test_num} with value length {test_len}"
|
||||
);
|
||||
|
||||
// Delete value
|
||||
db.write(
|
||||
BatchBuilder::new()
|
||||
.with_account_id(0)
|
||||
.with_collection(0)
|
||||
.with_account_id(0)
|
||||
.update_document(0)
|
||||
.clear(ValueClass::Property(1))
|
||||
.build_batch(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Make sure value is deleted
|
||||
assert_eq!(
|
||||
None,
|
||||
db.get_value::<String>(ValueKey {
|
||||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Property(1),
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
);
|
||||
|
||||
// Make sure other values are still there
|
||||
for (class, value) in [
|
||||
(ValueClass::Property(0), "check1"),
|
||||
(ValueClass::Property(2), "check2"),
|
||||
] {
|
||||
assert_eq!(
|
||||
Some(value.to_string()),
|
||||
db.get_value::<String>(ValueKey {
|
||||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class,
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// Delete everything
|
||||
db.write(
|
||||
BatchBuilder::new()
|
||||
.with_account_id(0)
|
||||
.with_collection(0)
|
||||
.with_account_id(0)
|
||||
.update_document(0)
|
||||
.clear(ValueClass::Property(0))
|
||||
.clear(ValueClass::Property(2))
|
||||
.build_batch(),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Make sure everything is deleted
|
||||
db.assert_is_empty(db.clone().into()).await;
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue