Fixed tests for FDB and SQL stores

This commit is contained in:
mdecimus 2025-05-23 17:07:36 +02:00
parent 2eb99ed3bd
commit c19d2ceb43
16 changed files with 113 additions and 42 deletions

View file

@ -328,7 +328,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
set: true,
});
if batch.len() >= 1000 {
if batch.is_large_batch() {
store
.write(batch.build_all())
.await

View file

@ -296,7 +296,7 @@ impl TracingStore for Store {
let mut batch = BatchBuilder::new();
for key in delete_keys {
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all()).await?;
batch = BatchBuilder::new();
}

View file

@ -196,7 +196,7 @@ impl ManageReports for Server {
batch.clear(ValueClass::Report(report_id));
if batch.len() > 1000 {
if batch.is_large_batch() {
if let Err(err) =
server.core.storage.data.write(batch.build_all()).await
{

View file

@ -716,20 +716,19 @@ async fn handle_session<T: SessionStream>(inner: Arc<Inner>, session: SessionDat
};
// Parse HTTP request
let response = match server
.parse_http_request(
req,
HttpSessionData {
instance,
local_ip: session.local_ip,
local_port: session.local_port,
remote_ip,
remote_port: session.remote_port,
is_tls,
session_id: session.session_id,
},
)
.await
let response = match Box::pin(server.parse_http_request(
req,
HttpSessionData {
instance,
local_ip: session.local_ip,
local_port: session.local_port,
remote_ip,
remote_port: session.remote_port,
is_tls,
session_id: session.session_id,
},
))
.await
{
Ok(response) => response,
Err(err) => {

View file

@ -50,7 +50,9 @@ impl MysqlStore {
&& start.elapsed() < MAX_COMMIT_TIME => {}
CommitError::Retry => {
if retry_count > MAX_COMMIT_ATTEMPTS || start.elapsed() > MAX_COMMIT_TIME {
return Err(trc::StoreEvent::AssertValueFailed.into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!()));
}
}
CommitError::Mysql(err) => {
@ -168,6 +170,7 @@ impl MysqlStore {
trx.rollback().await?;
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!())
.into());
}
}
@ -219,6 +222,11 @@ impl MysqlStore {
);
}
ValueOp::Clear => {
// Update asserted value
if let Some(exists) = asserted_values.get_mut(&key) {
*exists = false;
}
let s = trx
.prep(format!("DELETE FROM {} WHERE k = ?", table))
.await?;
@ -304,7 +312,10 @@ impl MysqlStore {
.unwrap_or_else(|| (false, assert_value.is_none()));
if !matches {
trx.rollback().await?;
return Err(trc::StoreEvent::AssertValueFailed.into_err().into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!())
.into());
}
asserted_values.insert(key, exists);
}

View file

@ -48,7 +48,9 @@ impl PostgresStore {
) if retry_count < MAX_COMMIT_ATTEMPTS
&& start.elapsed() < MAX_COMMIT_TIME => {}
Some(&SqlState::UNIQUE_VIOLATION) => {
return Err(trc::StoreEvent::AssertValueFailed.into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!()));
}
_ => return Err(into_error(err)),
},
@ -57,7 +59,9 @@ impl PostgresStore {
if retry_count > MAX_COMMIT_ATTEMPTS
|| start.elapsed() > MAX_COMMIT_TIME
{
return Err(trc::StoreEvent::AssertValueFailed.into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!()));
}
}
}
@ -165,7 +169,10 @@ impl PostgresStore {
};
if trx.execute(&s, &[&key, &(*value)]).await? == 0 {
return Err(trc::StoreEvent::AssertValueFailed.into_err().into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!())
.into());
}
}
ValueOp::AtomicAdd(by) => {
@ -210,6 +217,11 @@ impl PostgresStore {
.prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table))
.await?;
trx.execute(&s, &[&key]).await?;
// Update asserted value
if let Some(exists) = asserted_values.get_mut(&key) {
*exists = false;
}
}
}
}
@ -298,7 +310,10 @@ impl PostgresStore {
})
.unwrap_or_else(|| (false, assert_value.is_none()));
if !matches {
return Err(trc::StoreEvent::AssertValueFailed.into_err().into());
return Err(trc::StoreEvent::AssertValueFailed
.into_err()
.caused_by(trc::location!())
.into());
}
asserted_values.insert(key, exists);
}

View file

@ -398,7 +398,7 @@ impl InMemoryStore {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.len() >= 1000 {
if batch.is_large_batch() {
store
.write(batch.build_all())
.await
@ -425,7 +425,7 @@ impl InMemoryStore {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.len() >= 1000 {
if batch.is_large_batch() {
store
.write(batch.build_all())
.await

View file

@ -360,7 +360,7 @@ impl Store {
let mut batch = BatchBuilder::new();
for key in delete_keys {
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(std::mem::take(&mut batch).build_all())
.await
.caused_by(trc::location!())?;
@ -638,7 +638,7 @@ impl Store {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all()).await.unwrap();
batch = BatchBuilder::new();
}
@ -659,7 +659,7 @@ impl Store {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all()).await.unwrap();
batch = BatchBuilder::new();
}

View file

@ -217,7 +217,7 @@ impl Store {
.update_document(document.document_id);
for key in keys.into_iter() {
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all()).await?;
batch = BatchBuilder::new();
batch
@ -311,8 +311,10 @@ impl Store {
batch.update_document(document_id);
for key in keys {
if batch.len() >= 1000 {
self.write(batch.build_all()).await?;
if batch.is_large_batch() {
self.write(batch.build_all())
.await
.caused_by(trc::location!())?;
batch = BatchBuilder::new();
batch
.with_account_id(account_id)
@ -327,7 +329,9 @@ impl Store {
}
if !batch.is_empty() {
self.write(batch.build_all()).await?;
self.write(batch.build_all())
.await
.caused_by(trc::location!())?;
}
Ok(())

View file

@ -116,7 +116,7 @@ impl Store {
batch.with_account_id(account_id);
let mut last_collection = u8::MAX;
for (revoke_account_id, acl_item) in delete_keys.into_iter() {
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all())
.await
.caused_by(trc::location!())?;

View file

@ -355,7 +355,7 @@ impl BatchBuilder {
}
pub fn commit_point(&mut self) -> &mut Self {
if self.batch_size > 5_000_000 || self.batch_ops > 1000 {
if self.is_large_batch() {
self.serialize_changes();
self.commit_points.push(self.ops.len());
self.batch_ops = 0;
@ -370,6 +370,11 @@ impl BatchBuilder {
self
}
#[inline]
pub fn is_large_batch(&self) -> bool {
self.batch_size > 5_000_000 || self.batch_ops > 1000
}
pub fn any_op(&mut self, op: Operation) -> &mut Self {
self.ops.push(op);
self.batch_ops += 1;

View file

@ -215,7 +215,7 @@ impl Store {
let mut batch = BatchBuilder::new();
let mut last_account_id = u32::MAX;
for (account_id, op) in delete_keys.into_iter() {
if batch.len() >= 1000 {
if batch.is_large_batch() {
last_account_id = u32::MAX;
self.write(batch.build_all())
.await
@ -290,7 +290,7 @@ impl Store {
batch.with_account_id(account_id);
let mut last_collection = u8::MAX;
for (collection, document_id, op) in delete_keys.into_iter() {
if batch.len() >= 1000 {
if batch.is_large_batch() {
self.write(batch.build_all())
.await
.caused_by(trc::location!())?;

View file

@ -90,7 +90,7 @@ async fn jmap_tests() {
.await;
webhooks::test(&mut params).await;
/*email_query::test(&mut params, delete).await;
email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
email_parse::test(&mut params).await;
@ -105,7 +105,7 @@ async fn jmap_tests() {
auth_acl::test(&mut params).await;
auth_limits::test(&mut params).await;
auth_oauth::test(&mut params).await;
event_source::test(&mut params).await;*/
event_source::test(&mut params).await;
push_subscription::test(&mut params).await;
sieve_script::test(&mut params).await;
vacation_response::test(&mut params).await;

View file

@ -58,9 +58,9 @@ pub async fn test(test: &WebDavTest) {
"D:prop.D:lockdiscovery.D:activelock.D:owner.D:href",
"super-owner",
)
.with_value(
.with_any_value(
"D:prop.D:lockdiscovery.D:activelock.D:timeout",
"Second-456",
["Second-456", "Second-455"],
);
// Test 3: Creating a collection under an unmapped resource with a lock token should fail
@ -116,14 +116,17 @@ pub async fn test(test: &WebDavTest) {
let props = response.properties(&href);
props
.get(DavProperty::WebDav(WebDavProperty::LockDiscovery))
.with_values([
.with_some_values([
"D:activelock.D:owner.D:href:super-owner",
"D:activelock.D:timeout:Second-456",
"D:activelock.D:depth:infinity",
format!("D:activelock.D:locktoken.D:href:{lock_token}").as_str(),
format!("D:activelock.D:lockroot.D:href:{path}").as_str(),
"D:activelock.D:locktype.D:write",
"D:activelock.D:lockscope.D:exclusive",
])
.with_any_values([
"D:activelock.D:timeout:Second-456",
"D:activelock.D:timeout:Second-455",
]);
}

View file

@ -641,6 +641,24 @@ impl DavResponse {
self
}
pub fn with_any_value<'x>(
self,
query: &str,
expect: impl IntoIterator<Item = &'x str>,
) -> Self {
let expect = expect.into_iter().collect::<AHashSet<_>>();
if let Some(value) = self.find_keys(query).next() {
if !expect.contains(value) {
self.dump_response();
panic!("Expected {query} = {expect:?} but got {value:?}");
}
} else {
self.dump_response();
panic!("Key {query} not found.");
}
self
}
pub fn with_values<I, T>(self, query: &str, expect: I) -> Self
where
I: IntoIterator<Item = T>,

View file

@ -837,6 +837,22 @@ impl<'x> DavQueryResult<'x> {
self
}
pub fn with_any_values(&self, expected_values: impl IntoIterator<Item = &'x str>) -> &Self {
let values = self
.values
.iter()
.map(|s| s.as_str())
.collect::<AHashSet<_>>();
let expected_values = AHashSet::from_iter(expected_values);
if values.is_disjoint(&expected_values) {
self.response.dump_response();
panic!("Expected at least one of {expected_values:?} values, but got {values:?}",);
}
self
}
pub fn without_values(&self, expected_values: impl IntoIterator<Item = &'x str>) -> &Self {
let expected_values = AHashSet::from_iter(expected_values);
let values = self