RocksDB stress test fixes + find_merge_thread() bugfix

This commit is contained in:
mdecimus 2023-12-20 17:06:32 +01:00
parent f7313eecaf
commit d4aca0a8e0
28 changed files with 819 additions and 268 deletions

5
Cargo.lock generated
View file

@ -1305,9 +1305,9 @@ dependencies = [
[[package]] [[package]]
name = "deadpool-postgres" name = "deadpool-postgres"
version = "0.11.0" version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40b75ba49590d27f677d3bebaf76cd15889ca8b308bc7ba99bfa25f1d7269c13" checksum = "bda39fa1cfff190d8924d447ad04fd22772c250438ca5ce1dfb3c80621c05aaa"
dependencies = [ dependencies = [
"deadpool", "deadpool",
"tokio", "tokio",
@ -5496,6 +5496,7 @@ dependencies = [
"num_cpus", "num_cpus",
"prettytable-rs", "prettytable-rs",
"pwhash", "pwhash",
"rand",
"reqwest", "reqwest",
"rpassword", "rpassword",
"serde", "serde",

View file

@ -28,3 +28,4 @@ form_urlencoded = "1.1.0"
human-size = "0.4.2" human-size = "0.4.2"
futures = "0.3.28" futures = "0.3.28"
pwhash = "1.0.0" pwhash = "1.0.0"
rand = "0.8.5"

View file

@ -29,6 +29,7 @@ use std::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
time::Duration,
}; };
use console::style; use console::style;
@ -42,6 +43,7 @@ use mail_parser::mailbox::{
maildir, maildir,
mbox::{self, MessageIterator}, mbox::{self, MessageIterator},
}; };
use rand::Rng;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use tokio::{fs::File, io::AsyncReadExt}; use tokio::{fs::File, io::AsyncReadExt};
@ -366,6 +368,10 @@ impl ImportCommands {
total_imported.fetch_add(1, Ordering::Relaxed); total_imported.fetch_add(1, Ordering::Relaxed);
} }
Err(_) if retry_count < RETRY_ATTEMPTS => { Err(_) if retry_count < RETRY_ATTEMPTS => {
let backoff =
rand::thread_rng().gen_range(50..=300);
tokio::time::sleep(Duration::from_millis(backoff))
.await;
retry_count += 1; retry_count += 1;
continue; continue;
} }

View file

@ -232,9 +232,9 @@ impl From<store::Error> for DirectoryError {
tracing::warn!( tracing::warn!(
context = "directory", context = "directory",
event = "error", event = "error",
protocol = "sql", protocol = "store",
reason = %error, reason = %error,
"SQL directory error" "Directory error"
); );
DirectoryError::Store(error) DirectoryError::Store(error)

View file

@ -400,13 +400,6 @@ impl SessionData {
.await; .await;
} }
// Resynchronize source mailbox on a successful move
if did_move {
self.write_mailbox_changes(&src_mailbox, is_qresync)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
}
// Map copied JMAP Ids to IMAP UIDs in the destination folder. // Map copied JMAP Ids to IMAP UIDs in the destination folder.
if copied_ids.is_empty() { if copied_ids.is_empty() {
return Err(if response.rtype != ResponseType::Ok { return Err(if response.rtype != ResponseType::Ok {
@ -437,8 +430,8 @@ impl SessionData {
src_uids.sort_unstable(); src_uids.sort_unstable();
dest_uids.sort_unstable(); dest_uids.sort_unstable();
self.write_bytes(if is_move { let response = if is_move {
response.with_tag(arguments.tag).serialize( self.write_bytes(
StatusResponse::ok("Copied UIDs") StatusResponse::ok("Copied UIDs")
.with_code(ResponseCode::CopyUid { .with_code(ResponseCode::CopyUid {
uid_validity, uid_validity,
@ -447,6 +440,16 @@ impl SessionData {
}) })
.into_bytes(), .into_bytes(),
) )
.await;
if did_move {
// Resynchronize source mailbox on a successful move
self.write_mailbox_changes(&src_mailbox, is_qresync)
.await
.map_err(|r| r.with_tag(&arguments.tag))?;
}
response.with_tag(arguments.tag).into_bytes()
} else { } else {
response response
.with_tag(arguments.tag) .with_tag(arguments.tag)
@ -456,8 +459,9 @@ impl SessionData {
dest_uids, dest_uids,
}) })
.into_bytes() .into_bytes()
}) };
.await;
self.write_bytes(response).await;
Ok(()) Ok(())
} }

View file

@ -56,7 +56,7 @@ use utils::map::vec_map::VecMap;
use crate::{auth::AccessToken, mailbox::UidMailbox, services::housekeeper::Event, Bincode, JMAP}; use crate::{auth::AccessToken, mailbox::UidMailbox, services::housekeeper::Event, Bincode, JMAP};
use super::{ use super::{
index::{EmailIndexBuilder, TrimTextValue, MAX_SORT_FIELD_LENGTH}, index::{EmailIndexBuilder, TrimTextValue, VisitValues, MAX_ID_LENGTH, MAX_SORT_FIELD_LENGTH},
ingest::IngestedEmail, ingest::IngestedEmail,
metadata::MessageMetadata, metadata::MessageMetadata,
}; };
@ -322,30 +322,34 @@ impl JMAP {
} }
// Obtain threadId // Obtain threadId
let mut references = vec![]; let mut references = Vec::with_capacity(5);
let mut subject = ""; let mut subject = "";
for header in &metadata.contents.parts[0].headers { for header in &metadata.contents.parts[0].headers {
match header.name { match &header.name {
HeaderName::MessageId HeaderName::MessageId
| HeaderName::InReplyTo | HeaderName::InReplyTo
| HeaderName::References | HeaderName::References
| HeaderName::ResentMessageId => match &header.value { | HeaderName::ResentMessageId => {
HeaderValue::Text(text) => { header.value.visit_text(|id| {
references.push(text.as_ref()); if !id.is_empty() && id.len() < MAX_ID_LENGTH {
} references.push(id);
HeaderValue::TextList(list) => { }
references.extend(list.iter().map(|v| v.as_ref())); });
} }
_ => (), HeaderName::Subject if subject.is_empty() => {
}, subject = thread_name(match &header.value {
HeaderName::Subject => { HeaderValue::Text(text) => text.as_ref(),
if let HeaderValue::Text(value) = &header.value { HeaderValue::TextList(list) if !list.is_empty() => {
subject = thread_name(value).trim_text(MAX_SORT_FIELD_LENGTH); list.first().unwrap().as_ref()
} }
_ => "",
})
.trim_text(MAX_SORT_FIELD_LENGTH);
} }
_ => (), _ => (),
} }
} }
let thread_id = if !references.is_empty() { let thread_id = if !references.is_empty() {
self.find_or_merge_thread(account_id, subject, &references) self.find_or_merge_thread(account_id, subject, &references)
.await .await

View file

@ -140,7 +140,7 @@ impl JMAP {
mailbox_ids, mailbox_ids,
keywords: email.keywords, keywords: email.keywords,
received_at: email.received_at.map(|r| r.into()), received_at: email.received_at.map(|r| r.into()),
skip_duplicates: false, skip_duplicates: true,
encrypt: self.config.encrypt && self.config.encrypt_append, encrypt: self.config.encrypt && self.config.encrypt_append,
}) })
.await .await

View file

@ -184,14 +184,20 @@ impl IndexMessage for BatchBuilder {
} }
match header.name { match header.name {
HeaderName::MessageId HeaderName::MessageId => {
| HeaderName::InReplyTo
| HeaderName::References
| HeaderName::ResentMessageId => {
header.value.visit_text(|id| { header.value.visit_text(|id| {
// Add ids to inverted index // Add ids to inverted index
if id.len() < MAX_ID_LENGTH { if id.len() < MAX_ID_LENGTH {
self.value(Property::MessageId, id, F_INDEX | options); self.value(Property::MessageId, id, F_INDEX | options);
self.value(Property::References, id, F_INDEX | options);
}
});
}
HeaderName::InReplyTo | HeaderName::References | HeaderName::ResentMessageId => {
header.value.visit_text(|id| {
// Add ids to inverted index
if id.len() < MAX_ID_LENGTH {
self.value(Property::References, id, F_INDEX | options);
} }
}); });
} }
@ -523,21 +529,21 @@ impl GetContentLanguage for MessagePart<'_> {
} }
} }
trait VisitValues { pub trait VisitValues<'x> {
fn visit_addresses(&self, visitor: impl FnMut(AddressElement, &str)); fn visit_addresses<'y: 'x>(&'y self, visitor: impl FnMut(AddressElement, &'x str));
fn visit_text(&self, visitor: impl FnMut(&str)); fn visit_text<'y: 'x>(&'y self, visitor: impl FnMut(&'x str));
fn into_visit_text(self, visitor: impl FnMut(String)); fn into_visit_text(self, visitor: impl FnMut(String));
} }
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
enum AddressElement { pub enum AddressElement {
Name, Name,
Address, Address,
GroupName, GroupName,
} }
impl VisitValues for HeaderValue<'_> { impl<'x> VisitValues<'x> for HeaderValue<'x> {
fn visit_addresses(&self, mut visitor: impl FnMut(AddressElement, &str)) { fn visit_addresses<'y: 'x>(&'y self, mut visitor: impl FnMut(AddressElement, &'x str)) {
match self { match self {
HeaderValue::Address(Address::List(addr_list)) => { HeaderValue::Address(Address::List(addr_list)) => {
for addr in addr_list { for addr in addr_list {
@ -569,7 +575,7 @@ impl VisitValues for HeaderValue<'_> {
} }
} }
fn visit_text(&self, mut visitor: impl FnMut(&str)) { fn visit_text<'y: 'x>(&'y self, mut visitor: impl FnMut(&'x str)) {
match &self { match &self {
HeaderValue::Text(text) => { HeaderValue::Text(text) => {
visitor(text.as_ref()); visitor(text.as_ref());

View file

@ -21,7 +21,7 @@
* for more details. * for more details.
*/ */
use std::borrow::Cow; use std::{borrow::Cow, time::Duration};
use jmap_proto::{ use jmap_proto::{
object::Object, object::Object,
@ -34,6 +34,7 @@ use mail_parser::{
parsers::fields::thread::thread_name, HeaderName, HeaderValue, Message, PartType, parsers::fields::thread::thread_name, HeaderName, HeaderValue, Message, PartType,
}; };
use rand::Rng;
use store::{ use store::{
ahash::AHashSet, ahash::AHashSet,
query::Filter, query::Filter,
@ -46,7 +47,7 @@ use store::{
use utils::map::vec_map::VecMap; use utils::map::vec_map::VecMap;
use crate::{ use crate::{
email::index::{IndexMessage, MAX_ID_LENGTH}, email::index::{IndexMessage, VisitValues, MAX_ID_LENGTH},
mailbox::UidMailbox, mailbox::UidMailbox,
services::housekeeper::Event, services::housekeeper::Event,
IngestError, JMAP, IngestError, JMAP,
@ -77,6 +78,8 @@ pub struct IngestEmail<'x> {
pub encrypt: bool, pub encrypt: bool,
} }
const MAX_RETRIES: u32 = 10;
impl JMAP { impl JMAP {
#[allow(clippy::blocks_in_if_conditions)] #[allow(clippy::blocks_in_if_conditions)]
pub async fn email_ingest( pub async fn email_ingest(
@ -107,24 +110,26 @@ impl JMAP {
let thread_id = { let thread_id = {
let mut references = Vec::with_capacity(5); let mut references = Vec::with_capacity(5);
let mut subject = ""; let mut subject = "";
let mut message_id = "";
for header in message.root_part().headers().iter().rev() { for header in message.root_part().headers().iter().rev() {
match header.name { match &header.name {
HeaderName::MessageId HeaderName::MessageId => header.value.visit_text(|id| {
| HeaderName::InReplyTo if !id.is_empty() && id.len() < MAX_ID_LENGTH {
| HeaderName::References if message_id.is_empty() {
| HeaderName::ResentMessageId => match &header.value { message_id = id;
HeaderValue::Text(id) if id.len() < MAX_ID_LENGTH => {
references.push(id.as_ref());
}
HeaderValue::TextList(ids) => {
for id in ids {
if id.len() < MAX_ID_LENGTH {
references.push(id.as_ref());
}
} }
references.push(id);
} }
_ => (), }),
}, HeaderName::InReplyTo
| HeaderName::References
| HeaderName::ResentMessageId => {
header.value.visit_text(|id| {
if !id.is_empty() && id.len() < MAX_ID_LENGTH {
references.push(id);
}
});
}
HeaderName::Subject if subject.is_empty() => { HeaderName::Subject if subject.is_empty() => {
subject = thread_name(match &header.value { subject = thread_name(match &header.value {
HeaderValue::Text(text) => text.as_ref(), HeaderValue::Text(text) => text.as_ref(),
@ -141,24 +146,21 @@ impl JMAP {
// Check for duplicates // Check for duplicates
if params.skip_duplicates if params.skip_duplicates
&& !references.is_empty() && !message_id.is_empty()
&& !self && !self
.store .store
.filter( .filter(
params.account_id, params.account_id,
Collection::Email, Collection::Email,
references vec![Filter::eq(Property::MessageId, message_id)],
.iter()
.map(|id| Filter::eq(Property::MessageId, *id))
.collect(),
) )
.await .await
.map_err(|err| { .map_err(|err| {
tracing::error!( tracing::error!(
event = "error", event = "error",
context = "find_duplicates", context = "find_duplicates",
error = ?err, error = ?err,
"Duplicate message search failed."); "Duplicate message search failed.");
IngestError::Temporary IngestError::Temporary
})? })?
.results .results
@ -383,7 +385,7 @@ impl JMAP {
)); ));
filters.push(Filter::Or); filters.push(Filter::Or);
for reference in references { for reference in references {
filters.push(Filter::eq(Property::MessageId, *reference)); filters.push(Filter::eq(Property::References, *reference));
} }
filters.push(Filter::End); filters.push(Filter::End);
let results = self let results = self
@ -515,7 +517,9 @@ impl JMAP {
match self.store.write(batch.build()).await { match self.store.write(batch.build()).await {
Ok(_) => return Ok(Some(thread_id)), Ok(_) => return Ok(Some(thread_id)),
Err(store::Error::AssertValueFailed) if try_count < 3 => { Err(store::Error::AssertValueFailed) if try_count < MAX_RETRIES => {
let backoff = rand::thread_rng().gen_range(50..=300);
tokio::time::sleep(Duration::from_millis(backoff)).await;
try_count += 1; try_count += 1;
} }
Err(err) => { Err(err) => {

View file

@ -171,6 +171,7 @@ pub struct Bincode<T: serde::Serialize + serde::de::DeserializeOwned> {
pub inner: T, pub inner: T,
} }
#[derive(Debug)]
pub enum IngestError { pub enum IngestError {
Temporary, Temporary,
OverQuota, OverQuota,

View file

@ -241,6 +241,7 @@ impl JMAP {
stack.push((children, it)); stack.push((children, it));
} }
} }
debug_assert_eq!(response.ids.len(), paginate.ids.len(), "{tree:#?}");
response.update_results(paginate.build())?; response.update_results(paginate.build())?;
} else { } else {
response = self response = self

View file

@ -46,7 +46,11 @@ use jmap_proto::{
use store::{ use store::{
query::Filter, query::Filter,
roaring::RoaringBitmap, roaring::RoaringBitmap,
write::{assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE}, write::{
assert::{AssertValue, HashedValue},
log::ChangeLogBuilder,
BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE,
},
}; };
use crate::{ use crate::{
@ -115,13 +119,45 @@ impl JMAP {
.await?; .await?;
batch batch
.with_account_id(account_id) .with_account_id(account_id)
.with_collection(Collection::Mailbox) .with_collection(Collection::Mailbox);
.create_document(document_id)
.custom(builder); if let Value::Id(parent_id) =
builder.changes().unwrap().get(&Property::ParentId)
{
let parent_id = parent_id.document_id();
if parent_id > 0 {
batch
.update_document(parent_id - 1)
.assert_value(Property::Value, AssertValue::Some);
}
}
batch.create_document(document_id).custom(builder);
changes.log_insert(Collection::Mailbox, document_id); changes.log_insert(Collection::Mailbox, document_id);
ctx.mailbox_ids.insert(document_id); ctx.mailbox_ids.insert(document_id);
self.write_batch(batch).await?; match self.store.write(batch.build()).await {
ctx.response.created(id, document_id); Ok(_) => {
ctx.response.created(id, document_id);
}
Err(store::Error::AssertValueFailed) => {
ctx.response.not_created.append(
id,
SetError::forbidden().with_description(
"Another process deleted the parent mailbox, please try again.",
),
);
continue 'create;
}
Err(err) => {
tracing::error!(
event = "error",
context = "mailbox_set",
account_id = account_id,
error = ?err,
"Failed to update mailbox(es).");
return Err(MethodError::ServerPartialFail);
}
}
} }
Err(err) => { Err(err) => {
ctx.response.not_created.append(id, err); ctx.response.not_created.append(id, err);
@ -182,9 +218,21 @@ impl JMAP {
let mut batch = BatchBuilder::new(); let mut batch = BatchBuilder::new();
batch batch
.with_account_id(account_id) .with_account_id(account_id)
.with_collection(Collection::Mailbox) .with_collection(Collection::Mailbox);
.update_document(document_id)
.custom(builder); if let Value::Id(parent_id) =
builder.changes().unwrap().get(&Property::ParentId)
{
let parent_id = parent_id.document_id();
if parent_id > 0 {
batch
.update_document(parent_id - 1)
.assert_value(Property::Value, AssertValue::Some);
}
}
batch.update_document(document_id).custom(builder);
if !batch.is_empty() { if !batch.is_empty() {
match self.store.write(batch.build()).await { match self.store.write(batch.build()).await {
Ok(_) => { Ok(_) => {

View file

@ -28,7 +28,7 @@ num_cpus = { version = "1.15.0", optional = true }
blake3 = "1.3.3" blake3 = "1.3.3"
tracing = "0.1" tracing = "0.1"
lz4_flex = { version = "0.11" } lz4_flex = { version = "0.11" }
deadpool-postgres = { version = "0.11.0", optional = true } deadpool-postgres = { version = "0.12.1", optional = true }
tokio-postgres = { version = "0.7.10", optional = true } tokio-postgres = { version = "0.7.10", optional = true }
tokio-rustls = { version = "0.25.0", optional = true } tokio-rustls = { version = "0.25.0", optional = true }
rustls = { version = "0.22.0", optional = true } rustls = { version = "0.22.0", optional = true }

View file

@ -154,10 +154,13 @@ impl PostgresStore {
.await? .await?
} }
} else { } else {
trx trx.prepare_cached(&format!(
.prepare_cached( concat!(
&format!("INSERT INTO {} (k, v) VALUES ($1, $2) ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v", table), "INSERT INTO {} (k, v) VALUES ($1, $2) ",
) "ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v"
),
table
))
.await? .await?
}; };
@ -242,7 +245,10 @@ impl PostgresStore {
.serialize(false); .serialize(false);
let s = trx let s = trx
.prepare_cached("INSERT INTO l (k, v) VALUES ($1, $2) ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v") .prepare_cached(concat!(
"INSERT INTO l (k, v) VALUES ($1, $2) ",
"ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v"
))
.await?; .await?;
trx.execute(&s, &[&key, set]).await?; trx.execute(&s, &[&key, set]).await?;
} }

View file

@ -22,20 +22,27 @@
*/ */
use std::{ use std::{
sync::Arc,
thread::sleep, thread::sleep,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use rand::Rng; use rand::Rng;
use rocksdb::{Direction, ErrorKind, IteratorMode}; use roaring::RoaringBitmap;
use rocksdb::{
BoundColumnFamily, Direction, ErrorKind, IteratorMode, OptimisticTransactionDB,
OptimisticTransactionOptions, WriteOptions,
};
use super::{ use super::{
bitmap::{clear_bit, set_bit}, bitmap::{clear_bit, set_bit},
RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES, RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES,
}; };
use crate::{ use crate::{
write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME}, write::{
BitmapKey, IndexKey, Key, LogKey, ValueKey, Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
BitmapKey, Deserialize, IndexKey, Key, LogKey, ValueKey,
}; };
impl RocksDbStore { impl RocksDbStore {
@ -46,138 +53,28 @@ impl RocksDbStore {
let start = Instant::now(); let start = Instant::now();
let mut retry_count = 0; let mut retry_count = 0;
let cf_bitmaps = db.cf_handle(CF_BITMAPS).unwrap(); let mut txn_opts = OptimisticTransactionOptions::default();
let cf_values = db.cf_handle(CF_VALUES).unwrap(); txn_opts.set_snapshot(true);
let cf_indexes = db.cf_handle(CF_INDEXES).unwrap();
let cf_logs = db.cf_handle(CF_LOGS).unwrap(); let txn = RocksDBTransaction {
let cf_counters = db.cf_handle(CF_COUNTERS).unwrap(); db: &db,
cf_bitmaps: db.cf_handle(CF_BITMAPS).unwrap(),
cf_values: db.cf_handle(CF_VALUES).unwrap(),
cf_indexes: db.cf_handle(CF_INDEXES).unwrap(),
cf_logs: db.cf_handle(CF_LOGS).unwrap(),
cf_counters: db.cf_handle(CF_COUNTERS).unwrap(),
txn_opts,
batch: &batch,
};
loop { loop {
let mut account_id = u32::MAX; match txn.commit() {
let mut collection = u8::MAX; Ok(success) => {
let mut document_id = u32::MAX; return if success {
Ok(())
let txn = self.db.transaction(); } else {
let mut wb = txn.get_writebatch(); Err(crate::Error::AssertValueFailed)
};
for op in &batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
wb.merge_cf(&cf_counters, &key, &by.to_le_bytes()[..]);
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let key = key.serialize(false);
if let ValueOp::Set(value) = op {
wb.put_cf(&cf_values, &key, value);
} else {
wb.delete_cf(&cf_values, &key);
}
}
Operation::Index { field, key, set } => {
let key = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize(false);
if *set {
wb.put_cf(&cf_indexes, &key, []);
} else {
wb.delete_cf(&cf_indexes, &key);
}
}
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
class,
block_num: 0,
}
.serialize(false);
let value = if *set {
set_bit(document_id)
} else {
clear_bit(document_id)
};
wb.merge_cf(&cf_bitmaps, key, value);
}
Operation::Log {
collection,
change_id,
set,
} => {
let key = LogKey {
account_id,
collection: *collection,
change_id: *change_id,
}
.serialize(false);
wb.put_cf(&cf_logs, &key, set);
}
Operation::AssertValue {
class,
assert_value,
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let key = key.serialize(false);
let matches = txn
.get_cf(&cf_values, &key)?
.map(|value| assert_value.matches(&value))
.unwrap_or_else(|| assert_value.is_none());
if !matches {
return Err(crate::Error::AssertValueFailed);
}
}
}
}
match db.write(wb) {
Ok(_) => {
return Ok(());
} }
Err(err) => match err.kind() { Err(err) => match err.kind() {
ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain
@ -231,3 +128,269 @@ impl RocksDbStore {
Ok(()) Ok(())
} }
} }
struct RocksDBTransaction<'x> {
db: &'x OptimisticTransactionDB,
cf_bitmaps: Arc<BoundColumnFamily<'x>>,
cf_values: Arc<BoundColumnFamily<'x>>,
cf_indexes: Arc<BoundColumnFamily<'x>>,
cf_logs: Arc<BoundColumnFamily<'x>>,
cf_counters: Arc<BoundColumnFamily<'x>>,
txn_opts: OptimisticTransactionOptions,
batch: &'x Batch,
}
impl<'x> RocksDBTransaction<'x> {
fn commit(&self) -> Result<bool, rocksdb::Error> {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let txn = self
.db
.transaction_opt(&WriteOptions::default(), &self.txn_opts);
if !self.batch.is_atomic() {
for op in &self.batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
txn.merge_cf(&self.cf_counters, &key, &by.to_le_bytes()[..])?;
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let key = key.serialize(false);
if let ValueOp::Set(value) = op {
txn.put_cf(&self.cf_values, &key, value)?;
if matches!(class, ValueClass::ReservedId) {
if let Some(bitmap) = txn
.get_pinned_cf(
&self.cf_bitmaps,
&BitmapKey {
account_id,
collection,
class: BitmapClass::DocumentIds,
block_num: 0,
}
.serialize(false),
//true,
)?
.and_then(|bytes| RoaringBitmap::deserialize(&bytes).ok())
{
if bitmap.contains(document_id) {
txn.rollback()?;
return Ok(false);
}
}
}
} else {
txn.delete_cf(&self.cf_values, &key)?;
}
}
Operation::Index { field, key, set } => {
let key = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize(false);
if *set {
txn.put_cf(&self.cf_indexes, &key, [])?;
} else {
txn.delete_cf(&self.cf_indexes, &key)?;
}
}
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
class,
block_num: 0,
}
.serialize(false);
let value = if *set {
set_bit(document_id)
} else {
clear_bit(document_id)
};
txn.merge_cf(&self.cf_bitmaps, key, value)?;
}
Operation::Log {
collection,
change_id,
set,
} => {
let key = LogKey {
account_id,
collection: *collection,
change_id: *change_id,
}
.serialize(false);
txn.put_cf(&self.cf_logs, &key, set)?;
}
Operation::AssertValue {
class,
assert_value,
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let key = key.serialize(false);
let matches = txn
.get_pinned_for_update_cf(&self.cf_values, &key, true)?
.map(|value| assert_value.matches(&value))
.unwrap_or_else(|| assert_value.is_none());
if !matches {
txn.rollback()?;
return Ok(false);
}
}
}
}
txn.commit().map(|_| true)
} else {
let mut wb = txn.get_writebatch();
for op in &self.batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
wb.merge_cf(&self.cf_counters, &key, &by.to_le_bytes()[..]);
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
};
let key = key.serialize(false);
if let ValueOp::Set(value) = op {
wb.put_cf(&self.cf_values, &key, value);
} else {
wb.delete_cf(&self.cf_values, &key);
}
}
Operation::Index { field, key, set } => {
let key = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize(false);
if *set {
wb.put_cf(&self.cf_indexes, &key, []);
} else {
wb.delete_cf(&self.cf_indexes, &key);
}
}
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
class,
block_num: 0,
}
.serialize(false);
let value = if *set {
set_bit(document_id)
} else {
clear_bit(document_id)
};
wb.merge_cf(&self.cf_bitmaps, key, value);
}
Operation::Log {
collection,
change_id,
set,
} => {
let key = LogKey {
account_id,
collection: *collection,
change_id: *change_id,
}
.serialize(false);
wb.put_cf(&self.cf_logs, &key, set);
}
Operation::AssertValue { .. } => unreachable!(),
}
}
self.db.write(wb).map(|_| true)
}
}
}

View file

@ -31,6 +31,12 @@ use crate::{
SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN, SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
}; };
#[cfg(feature = "test_mode")]
lazy_static::lazy_static! {
pub static ref BITMAPS: std::sync::Arc<parking_lot::Mutex<std::collections::HashMap<Vec<u8>, std::collections::HashSet<u32>>>> =
std::sync::Arc::new(parking_lot::Mutex::new(std::collections::HashMap::new()));
}
impl Store { impl Store {
pub async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>> pub async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
where where
@ -141,6 +147,86 @@ impl Store {
} }
pub async fn write(&self, batch: Batch) -> crate::Result<()> { pub async fn write(&self, batch: Batch) -> crate::Result<()> {
#[cfg(feature = "test_mode")]
if std::env::var("PARANOID_WRITE").map_or(false, |v| v == "1") {
use crate::write::Operation;
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut bitmaps = Vec::new();
for op in &batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
} => {
document_id = *document_id_;
}
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
block_num: 0,
class,
}
.serialize(false);
bitmaps.push((key, class.clone(), document_id, *set));
}
_ => {}
}
}
match self {
#[cfg(feature = "sqlite")]
Self::SQLite(store) => store.write(batch).await,
#[cfg(feature = "foundation")]
Self::FoundationDb(store) => store.write(batch).await,
#[cfg(feature = "postgres")]
Self::PostgreSQL(store) => store.write(batch).await,
#[cfg(feature = "mysql")]
Self::MySQL(store) => store.write(batch).await,
#[cfg(feature = "rocks")]
Self::RocksDb(store) => store.write(batch).await,
}?;
for (key, class, document_id, set) in bitmaps {
let mut bitmaps = BITMAPS.lock();
let map = bitmaps.entry(key).or_default();
if set {
if !map.insert(document_id) {
println!(
concat!(
"WARNING: key {:?} already contains document {} for account ",
"{}, collection {}"
),
class, document_id, account_id, collection
);
}
} else if !map.remove(&document_id) {
println!(
concat!(
"WARNING: key {:?} does not contain document {} for account ",
"{}, collection {}"
),
class, document_id, account_id, collection
);
}
}
return Ok(());
}
match self { match self {
#[cfg(feature = "sqlite")] #[cfg(feature = "sqlite")]
Self::SQLite(store) => store.write(batch).await, Self::SQLite(store) => store.write(batch).await,
@ -303,6 +389,8 @@ impl Store {
.await .await
.unwrap(); .unwrap();
} }
BITMAPS.lock().clear();
} }
#[cfg(feature = "test_mode")] #[cfg(feature = "test_mode")]
@ -365,6 +453,8 @@ impl Store {
#[allow(unused_variables)] #[allow(unused_variables)]
pub async fn assert_is_empty(&self, blob_store: crate::BlobStore) { pub async fn assert_is_empty(&self, blob_store: crate::BlobStore) {
use utils::codec::leb128::Leb128Iterator;
use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_VALUES}; use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_VALUES};
self.blob_expire_all().await; self.blob_expire_all().await;
@ -406,9 +496,46 @@ impl Store {
return Ok(true); return Ok(true);
} }
const BM_DOCUMENT_IDS: u8 = 0;
const BM_TAG: u8 = 1 << 6;
const BM_TEXT: u8 = 1 << 7;
const TAG_TEXT: u8 = 1 << 0;
const TAG_STATIC: u8 = 1 << 1;
match key[5] {
BM_DOCUMENT_IDS => {
eprint!("Found document ids bitmap");
}
BM_TAG => {
eprint!(
"Found tagged id {} bitmap",
key[7..].iter().next_leb128::<u32>().unwrap()
);
}
TAG_TEXT => {
eprint!(
"Found tagged text {:?} bitmap",
String::from_utf8_lossy(&key[7..])
);
}
TAG_STATIC => {
eprint!("Found tagged static {} bitmap", key[7]);
}
other => {
if other & BM_TEXT == BM_TEXT {
eprint!(
"Found text hash {:?} bitmap",
String::from_utf8_lossy(&key[7..])
);
} else {
eprint!("Found unknown bitmap");
}
}
}
eprintln!( eprintln!(
concat!( concat!(
"Table bitmaps is not empty, account {}, collection {},", ", account {}, collection {},",
" family {}, field {}, key {:?}: {:?}" " family {}, field {}, key {:?}: {:?}"
), ),
u32::from_be_bytes(key[0..4].try_into().unwrap()), u32::from_be_bytes(key[0..4].try_into().unwrap()),
@ -420,7 +547,8 @@ impl Store {
); );
} }
SUBSPACE_VALUES SUBSPACE_VALUES
if key[0] >= 20 if key[0] == 3
|| key[0] >= 20
|| key.get(1..5).unwrap_or_default() == u32::MAX.to_be_bytes() => || key.get(1..5).unwrap_or_default() == u32::MAX.to_be_bytes() =>
{ {
// Ignore lastId counter and ID mappings // Ignore lastId counter and ID mappings
@ -433,7 +561,7 @@ impl Store {
SUBSPACE_INDEXES => { SUBSPACE_INDEXES => {
eprintln!( eprintln!(
concat!( concat!(
"Table index is not empty, account {}, collection {}, ", "Found index key, account {}, collection {}, ",
"document {}, property {}, value {:?}: {:?}" "document {}, property {}, value {:?}: {:?}"
), ),
u32::from_be_bytes(key[0..4].try_into().unwrap()), u32::from_be_bytes(key[0..4].try_into().unwrap()),
@ -446,7 +574,7 @@ impl Store {
} }
_ => { _ => {
eprintln!( eprintln!(
"Table {:?} is not empty: {:?} {:?}", "Found key in {:?}: {:?} {:?}",
char::from(subspace), char::from(subspace),
key, key,
value value

View file

@ -286,6 +286,14 @@ impl Store {
{ {
term_index term_index
} else { } else {
tracing::debug!(
context = "fts_remove",
event = "not_found",
account_id = account_id,
collection = collection,
document_id = document_id,
"Term index not found"
);
return Ok(false); return Ok(false);
}; };

View file

@ -40,7 +40,7 @@ pub struct Pagination {
anchor_offset: i32, anchor_offset: i32,
has_anchor: bool, has_anchor: bool,
anchor_found: bool, anchor_found: bool,
ids: Vec<u64>, pub ids: Vec<u64>,
prefix_key: Option<ValueKey<ValueClass>>, prefix_key: Option<ValueKey<ValueClass>>,
prefix_unique: bool, prefix_unique: bool,
} }

View file

@ -29,11 +29,12 @@ pub struct HashedValue<T: Deserialize> {
pub inner: T, pub inner: T,
} }
#[derive(Debug, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum AssertValue { pub enum AssertValue {
U32(u32), U32(u32),
U64(u64), U64(u64),
Hash(u64), Hash(u64),
Some,
None, None,
} }
@ -47,6 +48,12 @@ pub trait ToAssertValue {
fn to_assert_value(&self) -> AssertValue; fn to_assert_value(&self) -> AssertValue;
} }
impl ToAssertValue for AssertValue {
fn to_assert_value(&self) -> AssertValue {
*self
}
}
impl ToAssertValue for () { impl ToAssertValue for () {
fn to_assert_value(&self) -> AssertValue { fn to_assert_value(&self) -> AssertValue {
AssertValue::None AssertValue::None
@ -84,6 +91,7 @@ impl AssertValue {
AssertValue::U64(v) => bytes.len() == U64_LEN && u64::deserialize(bytes).unwrap() == *v, AssertValue::U64(v) => bytes.len() == U64_LEN && u64::deserialize(bytes).unwrap() == *v,
AssertValue::Hash(v) => xxhash_rust::xxh3::xxh3_64(bytes) == *v, AssertValue::Hash(v) => xxhash_rust::xxh3::xxh3_64(bytes) == *v,
AssertValue::None => false, AssertValue::None => false,
AssertValue::Some => true,
} }
} }

View file

@ -202,6 +202,21 @@ impl BatchBuilder {
} }
} }
impl Batch {
pub fn is_atomic(&self) -> bool {
!self.ops.iter().any(|op| {
matches!(
op,
Operation::AssertValue { .. }
| Operation::Value {
class: ValueClass::ReservedId,
op: ValueOp::Set(_)
}
)
})
}
}
impl Default for BatchBuilder { impl Default for BatchBuilder {
fn default() -> Self { fn default() -> Self {
Self::new() Self::new()

View file

@ -44,6 +44,7 @@ sed -i '' -e "s|__CERT_PATH__|$BASE_DIR/etc/tls_cert.pem|g" \
-e "s|__PK_PATH__|$BASE_DIR/etc/tls_privatekey.pem|g" "$BASE_DIR/etc/common/tls.toml" -e "s|__PK_PATH__|$BASE_DIR/etc/tls_privatekey.pem|g" "$BASE_DIR/etc/common/tls.toml"
sed -i '' -e 's/method = "log"/method = "stdout"/g' \ sed -i '' -e 's/method = "log"/method = "stdout"/g' \
-e 's/level = "info"/level = "trace"/g' "$BASE_DIR/etc/common/tracing.toml" -e 's/level = "info"/level = "trace"/g' "$BASE_DIR/etc/common/tracing.toml"
sed -i '' -e 's/%{HOST}%/127.0.0.1/g' "$BASE_DIR/etc/jmap/listener.toml"
sed -i '' -e 's/allow-plain-text = false/allow-plain-text = true/g' "$BASE_DIR/etc/imap/settings.toml" sed -i '' -e 's/allow-plain-text = false/allow-plain-text = true/g' "$BASE_DIR/etc/imap/settings.toml"
sed -i '' -e 's/user = "stalwart-mail"//g' \ sed -i '' -e 's/user = "stalwart-mail"//g' \
-e 's/group = "stalwart-mail"//g' "$BASE_DIR/etc/common/server.toml" -e 's/group = "stalwart-mail"//g' "$BASE_DIR/etc/common/server.toml"
@ -67,4 +68,5 @@ cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c
cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret list create everyone everyone@example.org cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret list create everyone everyone@example.org
cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret list add-members everyone jane john bill cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret list add-members everyone jane john bill
cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret account list cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret account list
cargo run --manifest-path=crates/cli/Cargo.toml -- -u https://127.0.0.1:8080 -c admin:secret import messages --format mbox john _ignore/dovecot-crlf
' '

BIN
tests/resources/mailbox.gz Normal file

Binary file not shown.

View file

@ -25,7 +25,7 @@ use std::{collections::hash_map::Entry, time::Instant};
use crate::{ use crate::{
jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, wait_for_index}, jmap::{assert_is_empty, mailbox::destroy_all_mailboxes, wait_for_index},
store::{deflate_artwork_data, query::FIELDS}, store::{deflate_test_resource, query::FIELDS},
}; };
use jmap_client::{ use jmap_client::{
client::Client, client::Client,
@ -682,7 +682,7 @@ pub async fn create(client: &mut Client) {
'outer: for record in csv::ReaderBuilder::new() 'outer: for record in csv::ReaderBuilder::new()
.has_headers(true) .has_headers(true)
.from_reader(&deflate_artwork_data()[..]) .from_reader(&deflate_test_resource("artwork_data.csv.gz")[..])
.records() .records()
{ {
let record = record.unwrap(); let record = record.unwrap();

View file

@ -177,14 +177,17 @@ data = "{STORE}"
fts = "{STORE}" fts = "{STORE}"
blob = "{STORE}" blob = "{STORE}"
[jmap.protocol] [jmap.protocol.get]
set.max-objects = 100000 max-objects = 100000
[jmap.protocol.set]
max-objects = 100000
[jmap.protocol.request] [jmap.protocol.request]
max-concurrent = 8 max-concurrent = 8
[jmap.protocol.upload] [jmap.protocol.upload]
max-size = 5000000 max-size = 50000000
max-concurrent = 4 max-concurrent = 4
ttl = "1m" ttl = "1m"
@ -256,7 +259,7 @@ refresh-token = "3s"
refresh-token-renew = "2s" refresh-token-renew = "2s"
"#; "#;
#[tokio::test] #[tokio::test(flavor = "multi_thread")]
pub async fn jmap_tests() { pub async fn jmap_tests() {
if let Ok(level) = std::env::var("LOG") { if let Ok(level) = std::env::var("LOG") {
tracing::subscriber::set_global_default( tracing::subscriber::set_global_default(
@ -280,18 +283,20 @@ pub async fn jmap_tests() {
delete, delete,
) )
.await; .await;
//assert_is_empty(params.server.clone()).await;
let coco = 1; let coco = 1;
//email_query::test(&mut params, delete).await; //email_query::test(&mut params, delete).await;
email_get::test(&mut params).await; /*email_get::test(&mut params).await;
email_set::test(&mut params).await; email_set::test(&mut params).await;
email_parse::test(&mut params).await; email_parse::test(&mut params).await;
email_search_snippet::test(&mut params).await; email_search_snippet::test(&mut params).await;
email_changes::test(&mut params).await; email_changes::test(&mut params).await;
email_query_changes::test(&mut params).await; email_query_changes::test(&mut params).await;
email_copy::test(&mut params).await; email_copy::test(&mut params).await;
thread_get::test(&mut params).await; thread_get::test(&mut params).await;*/
thread_merge::test(&mut params).await; thread_merge::test(&mut params).await;
mailbox::test(&mut params).await; /*mailbox::test(&mut params).await;
delivery::test(&mut params).await; delivery::test(&mut params).await;
auth_acl::test(&mut params).await; auth_acl::test(&mut params).await;
auth_limits::test(&mut params).await; auth_limits::test(&mut params).await;
@ -304,24 +309,37 @@ pub async fn jmap_tests() {
websocket::test(&mut params).await; websocket::test(&mut params).await;
quota::test(&mut params).await; quota::test(&mut params).await;
crypto::test(&mut params).await; crypto::test(&mut params).await;
blob::test(&mut params).await; blob::test(&mut params).await;*/
if delete { if delete {
params.temp_dir.delete(); params.temp_dir.delete();
} }
} }
#[tokio::test] #[tokio::test(flavor = "multi_thread")]
#[ignore] #[ignore]
pub async fn jmap_stress_tests() { pub async fn jmap_stress_tests() {
tracing::subscriber::set_global_default( if let Ok(level) = std::env::var("LOG") {
tracing_subscriber::FmtSubscriber::builder() tracing::subscriber::set_global_default(
.with_max_level(tracing::Level::WARN) tracing_subscriber::FmtSubscriber::builder()
.finish(), .with_env_filter(
) tracing_subscriber::EnvFilter::builder()
.unwrap(); .parse(
format!("smtp={level},imap={level},jmap={level},store={level},utils={level},directory={level}"),
)
.unwrap(),
)
.finish(),
)
.unwrap();
}
let params = init_jmap_tests("foundationdb", true).await; let params = init_jmap_tests(
&std::env::var("STORE")
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
true,
)
.await;
stress_test::test(params.server.clone(), params.client).await; stress_test::test(params.server.clone(), params.client).await;
params.temp_dir.delete(); params.temp_dir.delete();
} }

View file

@ -24,6 +24,7 @@
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use crate::jmap::{mailbox::destroy_all_mailboxes_no_wait, wait_for_index}; use crate::jmap::{mailbox::destroy_all_mailboxes_no_wait, wait_for_index};
use directory::backend::internal::manage::ManageDirectory;
use futures::future::join_all; use futures::future::join_all;
use jmap::{mailbox::UidMailbox, JMAP}; use jmap::{mailbox::UidMailbox, JMAP};
use jmap_client::{ use jmap_client::{
@ -41,10 +42,9 @@ const NUM_PASSES: usize = 1;
pub async fn test(server: Arc<JMAP>, mut client: Client) { pub async fn test(server: Arc<JMAP>, mut client: Client) {
println!("Running concurrency stress tests..."); println!("Running concurrency stress tests...");
server.store.get_or_create_account_id("john").await.unwrap();
client.set_default_account_id(Id::from(TEST_USER_ID).to_string()); client.set_default_account_id(Id::from(TEST_USER_ID).to_string());
let client = Arc::new(client); let client = Arc::new(client);
email_tests(server.clone(), client.clone()).await; email_tests(server.clone(), client.clone()).await;
mailbox_tests(server.clone(), client.clone()).await; mailbox_tests(server.clone(), client.clone()).await;
} }
@ -355,7 +355,22 @@ async fn mailbox_tests(server: Arc<JMAP>, client: Arc<Client>) {
join_all(futures).await; join_all(futures).await;
wait_for_index(&server).await; wait_for_index(&server).await;
destroy_all_mailboxes_no_wait(&client).await; for mailbox_id in client
.mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>)
.await
.unwrap()
.take_ids()
{
let _ = client.mailbox_move(&mailbox_id, None::<String>).await;
}
for mailbox_id in client
.mailbox_query(None::<mailbox::query::Filter>, None::<Vec<_>>)
.await
.unwrap()
.take_ids()
{
let _ = client.mailbox_destroy(&mailbox_id, true).await;
}
assert_is_empty(server).await; assert_is_empty(server).await;
} }
@ -409,11 +424,21 @@ async fn query_mailboxes(client: &Client) -> Vec<Mailbox> {
} }
async fn delete_mailbox(client: &Client, mailbox_id: &str) { async fn delete_mailbox(client: &Client, mailbox_id: &str) {
match client.mailbox_destroy(mailbox_id, true).await { for _ in 0..3 {
Ok(_) => (), match client.mailbox_destroy(mailbox_id, true).await {
Err(err) => match err { Ok(_) => return,
jmap_client::Error::Set(_) => (), Err(err) => match err {
_ => panic!("Failed: {:?}", err), jmap_client::Error::Set(_) => break,
}, jmap_client::Error::Transport(_) => {
let backoff = rand::thread_rng().gen_range(50..=300);
tokio::time::sleep(Duration::from_millis(backoff)).await;
}
_ => panic!("Failed: {:?}", err),
},
}
} }
/*println!(
"Warning: Too many transport errors while deleting mailbox {}.",
mailbox_id
);*/
} }

View file

@ -21,14 +21,29 @@
* for more details. * for more details.
*/ */
use crate::jmap::{assert_is_empty, mailbox::destroy_all_mailboxes}; use std::{io::Cursor, time::Duration};
use crate::{
jmap::{assert_is_empty, mailbox::destroy_all_mailboxes},
store::deflate_test_resource,
};
use jmap::{email::ingest::IngestEmail, IngestError};
use jmap_client::{email, mailbox::Role}; use jmap_client::{email, mailbox::Role};
use jmap_proto::types::id::Id; use jmap_proto::types::{collection::Collection, id::Id, property::Property};
use store::ahash::{AHashMap, AHashSet}; use mail_parser::{mailbox::mbox::MessageIterator, MessageParser};
use store::{
ahash::{AHashMap, AHashSet},
rand::{self, Rng},
};
use super::JMAPTest; use super::JMAPTest;
pub async fn test(params: &mut JMAPTest) { pub async fn test(params: &mut JMAPTest) {
test_single_thread(params).await;
test_multi_thread(params).await;
}
async fn test_single_thread(params: &mut JMAPTest) {
println!("Running Email Merge Threads tests..."); println!("Running Email Merge Threads tests...");
let server = params.server.clone(); let server = params.server.clone();
let client = &mut params.client; let client = &mut params.client;
@ -209,6 +224,87 @@ pub async fn test(params: &mut JMAPTest) {
assert_is_empty(server).await; assert_is_empty(server).await;
} }
#[allow(dead_code)]
async fn test_multi_thread(params: &mut JMAPTest) {
println!("Running Email Merge Threads tests (multi-threaded)...");
//let semaphore = sync::Arc::Arc::new(tokio::sync::Semaphore::new(100));
let mut handles = vec![];
let mailbox_id_str = params
.client
.set_default_account_id(Id::new(0u64).to_string())
.mailbox_create("Multi-thread nightmare", None::<String>, Role::None)
.await
.unwrap()
.take_id();
let mailbox_id = Id::from_bytes(mailbox_id_str.as_bytes())
.unwrap()
.document_id();
for message in MessageIterator::new(Cursor::new(deflate_test_resource("mailbox.gz")))
.collect::<Vec<_>>()
.into_iter()
{
//let permit = Arc::clone(&semaphore);
let message = message.unwrap();
let server = params.server.clone();
handles.push(tokio::task::spawn(async move {
//let _permit = permit.acquire().await.expect("Failed to acquire permit");
let mut retry_count = 0;
loop {
match server
.email_ingest(IngestEmail {
raw_message: message.contents(),
message: MessageParser::new().parse(message.contents()),
account_id: 0,
account_quota: 0,
mailbox_ids: vec![mailbox_id],
keywords: vec![],
received_at: None,
skip_duplicates: true,
encrypt: false,
})
.await
{
Ok(_) => break,
Err(IngestError::Temporary) if retry_count < 10 => {
//println!("Retrying ingest for {}...", message.from());
let backoff = rand::thread_rng().gen_range(50..=300);
tokio::time::sleep(Duration::from_millis(backoff)).await;
retry_count += 1;
continue;
}
Err(IngestError::Permanent { .. }) => {
panic!(
"Failed to ingest message: {:?} {}",
message.from(),
String::from_utf8_lossy(message.contents())
);
}
Err(err) => panic!("Failed to ingest message: {:?}", err),
}
}
}));
}
// Wait for all tasks to complete
let messages = handles.len();
println!("Waiting for {} tasks to complete...", messages);
for handle in handles {
handle.await.expect("Task panicked");
}
assert_eq!(
messages as u64,
params
.server
.get_tag(0, Collection::Email, Property::MailboxIds, mailbox_id,)
.await
.unwrap()
.unwrap()
.len()
);
println!("Deleting all messages...");
destroy_all_mailboxes(params).await;
assert_is_empty(params.server.clone()).await;
}
fn build_message(message: usize, in_reply_to: Option<usize>, thread_num: usize) -> String { fn build_message(message: usize, in_reply_to: Option<usize>, thread_num: usize) -> String {
if let Some(in_reply_to) = in_reply_to { if let Some(in_reply_to) = in_reply_to {
format!( format!(

View file

@ -82,32 +82,38 @@ url = "redis://127.0.0.1"
"#; "#;
#[tokio::test] #[tokio::test(flavor = "multi_thread")]
pub async fn store_tests() { pub async fn store_tests() {
let insert = true; let insert = true;
let temp_dir = TempDir::new("store_tests", insert); let temp_dir = TempDir::new("store_tests", insert);
let config = Config::new(&CONFIG.replace("{TMP}", &temp_dir.path.to_string_lossy())).unwrap(); let config = Config::new(&CONFIG.replace("{TMP}", &temp_dir.path.to_string_lossy())).unwrap();
let stores = config.parse_stores().await.unwrap(); let stores = config.parse_stores().await.unwrap();
for (store_id, store) in stores.stores { let store_id = std::env::var("STORE")
println!("Testing store {}...", store_id); .expect("Missing store type. Try running `STORE=<store_type> cargo test`");
if insert { let store = stores
store.destroy().await; .stores
} .get(&store_id)
ops::test(store.clone()).await; .expect("Store not found")
query::test(store.clone(), FtsStore::Store(store.clone()), insert).await; .clone();
assign_id::test(store).await;
println!("Testing store {}...", store_id);
if insert {
store.destroy().await;
} }
ops::test(store.clone()).await;
query::test(store.clone(), FtsStore::Store(store.clone()), insert).await;
assign_id::test(store).await;
if insert { if insert {
temp_dir.delete(); temp_dir.delete();
} }
} }
pub fn deflate_artwork_data() -> Vec<u8> { pub fn deflate_test_resource(name: &str) -> Vec<u8> {
let mut csv_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); let mut csv_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
csv_path.push("resources"); csv_path.push("resources");
csv_path.push("artwork_data.csv.gz"); csv_path.push(name);
let mut decoder = flate2::bufread::GzDecoder::new(std::io::BufReader::new( let mut decoder = flate2::bufread::GzDecoder::new(std::io::BufReader::new(
std::fs::File::open(csv_path).unwrap(), std::fs::File::open(csv_path).unwrap(),

View file

@ -43,7 +43,7 @@ use store::{
Store, ValueKey, Store, ValueKey,
}; };
use crate::store::deflate_artwork_data; use crate::store::deflate_test_resource;
pub const FIELDS: [&str; 20] = [ pub const FIELDS: [&str; 20] = [
"id", "id",
@ -139,7 +139,7 @@ pub async fn test(db: Store, fts_store: FtsStore, do_insert: bool) {
pool.scope_fifo(|s| { pool.scope_fifo(|s| {
for (document_id, record) in csv::ReaderBuilder::new() for (document_id, record) in csv::ReaderBuilder::new()
.has_headers(true) .has_headers(true)
.from_reader(&deflate_artwork_data()[..]) .from_reader(&deflate_test_resource("artwork_data.csv.gz")[..])
.records() .records()
.enumerate() .enumerate()
{ {