From d4327af3add914032cfb1bf737b835181e9e08dc Mon Sep 17 00:00:00 2001 From: mdecimus Date: Mon, 13 May 2024 15:36:59 +0200 Subject: [PATCH] Log errors during message ingestion --- crates/common/src/manager/backup.rs | 6 +- crates/jmap/src/services/ingest.rs | 31 ++++++++-- crates/store/src/fts/index.rs | 88 +---------------------------- tests/src/jmap/mod.rs | 4 +- 4 files changed, 32 insertions(+), 97 deletions(-) diff --git a/crates/common/src/manager/backup.rs b/crates/common/src/manager/backup.rs index cb116ae8..19c96ec0 100644 --- a/crates/common/src/manager/backup.rs +++ b/crates/common/src/manager/backup.rs @@ -91,7 +91,7 @@ impl Core { for (async_handle, sync_handle) in [ self.backup_properties(&dest), - self.backup_term_index(&dest), + self.backup_fts_index(&dest), self.backup_acl(&dest), self.backup_blob(&dest), self.backup_config(&dest), @@ -223,9 +223,9 @@ impl Core { ) } - fn backup_term_index(&self, dest: &Path) -> TaskHandle { + fn backup_fts_index(&self, dest: &Path) -> TaskHandle { let store = self.storage.data.clone(); - let (handle, writer) = spawn_writer(dest.join("term_index")); + let (handle, writer) = spawn_writer(dest.join("fts_index")); ( tokio::spawn(async move { writer diff --git a/crates/jmap/src/services/ingest.rs b/crates/jmap/src/services/ingest.rs index aeefc8a7..90aaeb36 100644 --- a/crates/jmap/src/services/ingest.rs +++ b/crates/jmap/src/services/ingest.rs @@ -40,7 +40,14 @@ impl JMAP { .await { Ok(Some(raw_message)) => raw_message, - _ => { + result => { + tracing::error!( + context = "ingest", + rcpts = ?message.recipients, + error = ?result, + "Failed to fetch message blob." + ); + return (0..message.recipients.len()) .map(|_| DeliveryResult::TemporaryFailure { reason: "Temporary I/O error.".into(), @@ -53,15 +60,27 @@ impl JMAP { let mut recipients = Vec::with_capacity(message.recipients.len()); let mut deliver_names = AHashMap::with_capacity(message.recipients.len()); for rcpt in &message.recipients { - let uids = self + match self .core .email_to_ids(&self.core.storage.directory, rcpt) .await - .unwrap_or_default(); - for uid in &uids { - deliver_names.insert(*uid, (DeliveryResult::Success, rcpt)); + { + Ok(uids) => { + for uid in &uids { + deliver_names.insert(*uid, (DeliveryResult::Success, rcpt)); + } + recipients.push(uids); + } + Err(err) => { + tracing::error!( + context = "ingest", + error = ?err, + rcpt = rcpt, + "Failed to lookup recipient" + ); + recipients.push(vec![]); + } } - recipients.push(uids); } // Deliver to each recipient diff --git a/crates/store/src/fts/index.rs b/crates/store/src/fts/index.rs index 4d42eac3..e0abe188 100644 --- a/crates/store/src/fts/index.rs +++ b/crates/store/src/fts/index.rs @@ -206,35 +206,16 @@ impl Store { return Ok(()); } - // Serialize tokens - //let mut term_index = KeySerializer::new(tokens.len() * U64_LEN * 2); + // Serialize keys let mut keys = Vec::with_capacity(tokens.len()); - - // Write index keys for (hash, postings) in tokens.into_iter() { - //term_index = term_index.write(hash.hash.as_slice()).write(hash.len); keys.push(Operation::Value { class: ValueClass::FtsIndex(hash), op: ValueOp::Set(postings.serialize().into()), }); } - // Write term index - /*let mut batch = BatchBuilder::new(); - let mut term_index = lz4_flex::compress_prepend_size(&term_index.finalize()); - term_index.insert(0, TERM_INDEX_VERSION); - batch - .with_account_id(document.account_id) - .with_collection(document.collection) - .update_document(document.document_id) - .set( - ValueClass::FtsIndex(BitmapHash { - hash: [u8::MAX; 8], - len: u8::MAX, - }), - term_index, - ); - self.write(batch.build()).await?;*/ + // Commit index let mut batch = BatchBuilder::new(); batch .with_account_id(document.account_id) @@ -358,68 +339,3 @@ impl Store { Ok(()) } } - -/* -struct TermIndex { - ops: Vec, -} - -impl Deserialize for TermIndex { - fn deserialize(bytes: &[u8]) -> crate::Result { - if bytes.first().copied().unwrap_or_default() != TERM_INDEX_VERSION { - return Err(Error::InternalError( - "Unsupported term index version".to_string(), - )); - } - let bytes = lz4_flex::decompress_size_prepended(bytes.get(1..).unwrap_or_default()) - .map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?; - let mut ops = Vec::new(); - - // Skip bigrams - let (num_items, pos) = - bytes - .as_slice() - .read_leb128::() - .ok_or(Error::InternalError( - "Failed to read term index marker".to_string(), - ))?; - - let mut bytes = bytes - .get(pos + (num_items * 8)..) - .unwrap_or_default() - .iter() - .peekable(); - - while bytes.peek().is_some() { - let mut hash = BitmapHash { - hash: [0; 8], - len: 0, - }; - - for byte in hash.hash.iter_mut() { - *byte = *bytes.next().ok_or(Error::InternalError( - "Unexpected EOF reading term index".to_string(), - ))?; - } - - hash.len = *bytes.next().ok_or(Error::InternalError( - "Unexpected EOF reading term index".to_string(), - ))?; - let num_fields = *bytes.next().ok_or(Error::InternalError( - "Unexpected EOF reading term index".to_string(), - ))?; - for _ in 0..num_fields { - let field = *bytes.next().ok_or(Error::InternalError( - "Unexpected EOF reading term index".to_string(), - ))?; - ops.push(Operation::Bitmap { - class: BitmapClass::Text { field, token: hash }, - set: false, - }); - } - } - - Ok(Self { ops }) - } -} -*/ diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index cbf8aaa3..834d87e7 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -184,8 +184,8 @@ password = "password" type = "elasticsearch" url = "https://localhost:9200" user = "elastic" -password = "RtQ-Lu6+o4rxx=XJplVJ" -allow-invalid-certs = true +password = "changeme" +tls.allow-invalid-certs = true disable = true [certificate.default]