diff --git a/crates/common/src/manager/restore.rs b/crates/common/src/manager/restore.rs index 93989559..869353de 100644 --- a/crates/common/src/manager/restore.rs +++ b/crates/common/src/manager/restore.rs @@ -328,7 +328,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) { set: true, }); - if batch.len() >= 1000 { + if batch.is_large_batch() { store .write(batch.build_all()) .await diff --git a/crates/common/src/telemetry/tracers/store.rs b/crates/common/src/telemetry/tracers/store.rs index 04ef4ab3..fd7036d6 100644 --- a/crates/common/src/telemetry/tracers/store.rs +++ b/crates/common/src/telemetry/tracers/store.rs @@ -296,7 +296,7 @@ impl TracingStore for Store { let mut batch = BatchBuilder::new(); for key in delete_keys { - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()).await?; batch = BatchBuilder::new(); } diff --git a/crates/http/src/management/report.rs b/crates/http/src/management/report.rs index 1e5e50d0..b477d75a 100644 --- a/crates/http/src/management/report.rs +++ b/crates/http/src/management/report.rs @@ -196,7 +196,7 @@ impl ManageReports for Server { batch.clear(ValueClass::Report(report_id)); - if batch.len() > 1000 { + if batch.is_large_batch() { if let Err(err) = server.core.storage.data.write(batch.build_all()).await { diff --git a/crates/http/src/request.rs b/crates/http/src/request.rs index dbe10f1b..3c4102ed 100644 --- a/crates/http/src/request.rs +++ b/crates/http/src/request.rs @@ -716,20 +716,19 @@ async fn handle_session(inner: Arc, session: SessionDat }; // Parse HTTP request - let response = match server - .parse_http_request( - req, - HttpSessionData { - instance, - local_ip: session.local_ip, - local_port: session.local_port, - remote_ip, - remote_port: session.remote_port, - is_tls, - session_id: session.session_id, - }, - ) - .await + let response = match Box::pin(server.parse_http_request( + req, + HttpSessionData { + instance, + local_ip: session.local_ip, + local_port: session.local_port, + remote_ip, + remote_port: session.remote_port, + is_tls, + session_id: session.session_id, + }, + )) + .await { Ok(response) => response, Err(err) => { diff --git a/crates/store/src/backend/mysql/write.rs b/crates/store/src/backend/mysql/write.rs index 2f030cb3..18d681bc 100644 --- a/crates/store/src/backend/mysql/write.rs +++ b/crates/store/src/backend/mysql/write.rs @@ -50,7 +50,9 @@ impl MysqlStore { && start.elapsed() < MAX_COMMIT_TIME => {} CommitError::Retry => { if retry_count > MAX_COMMIT_ATTEMPTS || start.elapsed() > MAX_COMMIT_TIME { - return Err(trc::StoreEvent::AssertValueFailed.into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!())); } } CommitError::Mysql(err) => { @@ -168,6 +170,7 @@ impl MysqlStore { trx.rollback().await?; return Err(trc::StoreEvent::AssertValueFailed .into_err() + .caused_by(trc::location!()) .into()); } } @@ -219,6 +222,11 @@ impl MysqlStore { ); } ValueOp::Clear => { + // Update asserted value + if let Some(exists) = asserted_values.get_mut(&key) { + *exists = false; + } + let s = trx .prep(format!("DELETE FROM {} WHERE k = ?", table)) .await?; @@ -304,7 +312,10 @@ impl MysqlStore { .unwrap_or_else(|| (false, assert_value.is_none())); if !matches { trx.rollback().await?; - return Err(trc::StoreEvent::AssertValueFailed.into_err().into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!()) + .into()); } asserted_values.insert(key, exists); } diff --git a/crates/store/src/backend/postgres/write.rs b/crates/store/src/backend/postgres/write.rs index 6a46ae47..2762c99d 100644 --- a/crates/store/src/backend/postgres/write.rs +++ b/crates/store/src/backend/postgres/write.rs @@ -48,7 +48,9 @@ impl PostgresStore { ) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => {} Some(&SqlState::UNIQUE_VIOLATION) => { - return Err(trc::StoreEvent::AssertValueFailed.into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!())); } _ => return Err(into_error(err)), }, @@ -57,7 +59,9 @@ impl PostgresStore { if retry_count > MAX_COMMIT_ATTEMPTS || start.elapsed() > MAX_COMMIT_TIME { - return Err(trc::StoreEvent::AssertValueFailed.into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!())); } } } @@ -165,7 +169,10 @@ impl PostgresStore { }; if trx.execute(&s, &[&key, &(*value)]).await? == 0 { - return Err(trc::StoreEvent::AssertValueFailed.into_err().into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!()) + .into()); } } ValueOp::AtomicAdd(by) => { @@ -210,6 +217,11 @@ impl PostgresStore { .prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table)) .await?; trx.execute(&s, &[&key]).await?; + + // Update asserted value + if let Some(exists) = asserted_values.get_mut(&key) { + *exists = false; + } } } } @@ -298,7 +310,10 @@ impl PostgresStore { }) .unwrap_or_else(|| (false, assert_value.is_none())); if !matches { - return Err(trc::StoreEvent::AssertValueFailed.into_err().into()); + return Err(trc::StoreEvent::AssertValueFailed + .into_err() + .caused_by(trc::location!()) + .into()); } asserted_values.insert(key, exists); } diff --git a/crates/store/src/dispatch/lookup.rs b/crates/store/src/dispatch/lookup.rs index aef1b356..a0079588 100644 --- a/crates/store/src/dispatch/lookup.rs +++ b/crates/store/src/dispatch/lookup.rs @@ -398,7 +398,7 @@ impl InMemoryStore { class: ValueClass::InMemory(InMemoryClass::Key(key)), op: ValueOp::Clear, }); - if batch.len() >= 1000 { + if batch.is_large_batch() { store .write(batch.build_all()) .await @@ -425,7 +425,7 @@ impl InMemoryStore { class: ValueClass::InMemory(InMemoryClass::Key(key)), op: ValueOp::Clear, }); - if batch.len() >= 1000 { + if batch.is_large_batch() { store .write(batch.build_all()) .await diff --git a/crates/store/src/dispatch/store.rs b/crates/store/src/dispatch/store.rs index b25d012d..d369e64e 100644 --- a/crates/store/src/dispatch/store.rs +++ b/crates/store/src/dispatch/store.rs @@ -360,7 +360,7 @@ impl Store { let mut batch = BatchBuilder::new(); for key in delete_keys { - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(std::mem::take(&mut batch).build_all()) .await .caused_by(trc::location!())?; @@ -638,7 +638,7 @@ impl Store { class: ValueClass::InMemory(InMemoryClass::Key(key)), op: ValueOp::Clear, }); - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()).await.unwrap(); batch = BatchBuilder::new(); } @@ -659,7 +659,7 @@ impl Store { class: ValueClass::InMemory(InMemoryClass::Key(key)), op: ValueOp::Clear, }); - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()).await.unwrap(); batch = BatchBuilder::new(); } diff --git a/crates/store/src/fts/index.rs b/crates/store/src/fts/index.rs index c6aa956a..a7423fc3 100644 --- a/crates/store/src/fts/index.rs +++ b/crates/store/src/fts/index.rs @@ -217,7 +217,7 @@ impl Store { .update_document(document.document_id); for key in keys.into_iter() { - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()).await?; batch = BatchBuilder::new(); batch @@ -311,8 +311,10 @@ impl Store { batch.update_document(document_id); for key in keys { - if batch.len() >= 1000 { - self.write(batch.build_all()).await?; + if batch.is_large_batch() { + self.write(batch.build_all()) + .await + .caused_by(trc::location!())?; batch = BatchBuilder::new(); batch .with_account_id(account_id) @@ -327,7 +329,9 @@ impl Store { } if !batch.is_empty() { - self.write(batch.build_all()).await?; + self.write(batch.build_all()) + .await + .caused_by(trc::location!())?; } Ok(()) diff --git a/crates/store/src/query/acl.rs b/crates/store/src/query/acl.rs index e4525af9..5f38e70d 100644 --- a/crates/store/src/query/acl.rs +++ b/crates/store/src/query/acl.rs @@ -116,7 +116,7 @@ impl Store { batch.with_account_id(account_id); let mut last_collection = u8::MAX; for (revoke_account_id, acl_item) in delete_keys.into_iter() { - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()) .await .caused_by(trc::location!())?; diff --git a/crates/store/src/write/batch.rs b/crates/store/src/write/batch.rs index 67f8cfc8..71348088 100644 --- a/crates/store/src/write/batch.rs +++ b/crates/store/src/write/batch.rs @@ -355,7 +355,7 @@ impl BatchBuilder { } pub fn commit_point(&mut self) -> &mut Self { - if self.batch_size > 5_000_000 || self.batch_ops > 1000 { + if self.is_large_batch() { self.serialize_changes(); self.commit_points.push(self.ops.len()); self.batch_ops = 0; @@ -370,6 +370,11 @@ impl BatchBuilder { self } + #[inline] + pub fn is_large_batch(&self) -> bool { + self.batch_size > 5_000_000 || self.batch_ops > 1000 + } + pub fn any_op(&mut self, op: Operation) -> &mut Self { self.ops.push(op); self.batch_ops += 1; diff --git a/crates/store/src/write/blob.rs b/crates/store/src/write/blob.rs index ae2bccc6..cf20cb08 100644 --- a/crates/store/src/write/blob.rs +++ b/crates/store/src/write/blob.rs @@ -215,7 +215,7 @@ impl Store { let mut batch = BatchBuilder::new(); let mut last_account_id = u32::MAX; for (account_id, op) in delete_keys.into_iter() { - if batch.len() >= 1000 { + if batch.is_large_batch() { last_account_id = u32::MAX; self.write(batch.build_all()) .await @@ -290,7 +290,7 @@ impl Store { batch.with_account_id(account_id); let mut last_collection = u8::MAX; for (collection, document_id, op) in delete_keys.into_iter() { - if batch.len() >= 1000 { + if batch.is_large_batch() { self.write(batch.build_all()) .await .caused_by(trc::location!())?; diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index befc598b..ec4ec3cd 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -90,7 +90,7 @@ async fn jmap_tests() { .await; webhooks::test(&mut params).await; - /*email_query::test(&mut params, delete).await; + email_query::test(&mut params, delete).await; email_get::test(&mut params).await; email_set::test(&mut params).await; email_parse::test(&mut params).await; @@ -105,7 +105,7 @@ async fn jmap_tests() { auth_acl::test(&mut params).await; auth_limits::test(&mut params).await; auth_oauth::test(&mut params).await; - event_source::test(&mut params).await;*/ + event_source::test(&mut params).await; push_subscription::test(&mut params).await; sieve_script::test(&mut params).await; vacation_response::test(&mut params).await; diff --git a/tests/src/webdav/lock.rs b/tests/src/webdav/lock.rs index 1c4ec594..b6a973a5 100644 --- a/tests/src/webdav/lock.rs +++ b/tests/src/webdav/lock.rs @@ -58,9 +58,9 @@ pub async fn test(test: &WebDavTest) { "D:prop.D:lockdiscovery.D:activelock.D:owner.D:href", "super-owner", ) - .with_value( + .with_any_value( "D:prop.D:lockdiscovery.D:activelock.D:timeout", - "Second-456", + ["Second-456", "Second-455"], ); // Test 3: Creating a collection under an unmapped resource with a lock token should fail @@ -116,14 +116,17 @@ pub async fn test(test: &WebDavTest) { let props = response.properties(&href); props .get(DavProperty::WebDav(WebDavProperty::LockDiscovery)) - .with_values([ + .with_some_values([ "D:activelock.D:owner.D:href:super-owner", - "D:activelock.D:timeout:Second-456", "D:activelock.D:depth:infinity", format!("D:activelock.D:locktoken.D:href:{lock_token}").as_str(), format!("D:activelock.D:lockroot.D:href:{path}").as_str(), "D:activelock.D:locktype.D:write", "D:activelock.D:lockscope.D:exclusive", + ]) + .with_any_values([ + "D:activelock.D:timeout:Second-456", + "D:activelock.D:timeout:Second-455", ]); } diff --git a/tests/src/webdav/mod.rs b/tests/src/webdav/mod.rs index 467b9cfb..2640eb96 100644 --- a/tests/src/webdav/mod.rs +++ b/tests/src/webdav/mod.rs @@ -641,6 +641,24 @@ impl DavResponse { self } + pub fn with_any_value<'x>( + self, + query: &str, + expect: impl IntoIterator, + ) -> Self { + let expect = expect.into_iter().collect::>(); + if let Some(value) = self.find_keys(query).next() { + if !expect.contains(value) { + self.dump_response(); + panic!("Expected {query} = {expect:?} but got {value:?}"); + } + } else { + self.dump_response(); + panic!("Key {query} not found."); + } + self + } + pub fn with_values(self, query: &str, expect: I) -> Self where I: IntoIterator, diff --git a/tests/src/webdav/prop.rs b/tests/src/webdav/prop.rs index 427a0b13..e1c89a66 100644 --- a/tests/src/webdav/prop.rs +++ b/tests/src/webdav/prop.rs @@ -837,6 +837,22 @@ impl<'x> DavQueryResult<'x> { self } + pub fn with_any_values(&self, expected_values: impl IntoIterator) -> &Self { + let values = self + .values + .iter() + .map(|s| s.as_str()) + .collect::>(); + let expected_values = AHashSet::from_iter(expected_values); + + if values.is_disjoint(&expected_values) { + self.response.dump_response(); + panic!("Expected at least one of {expected_values:?} values, but got {values:?}",); + } + + self + } + pub fn without_values(&self, expected_values: impl IntoIterator) -> &Self { let expected_values = AHashSet::from_iter(expected_values); let values = self