Support for concurrent index updates

This commit is contained in:
mdecimus 2024-04-28 19:52:02 +02:00
parent fb110fb078
commit 91f674d921
7 changed files with 168 additions and 59 deletions

View file

@ -50,9 +50,10 @@ use mail_parser::{parsers::fields::thread::thread_name, HeaderName, HeaderValue}
use store::{
write::{
log::{Changes, LogInsert},
BatchBuilder, Bincode, MaybeDynamicId, TagValue, ValueClass, F_BITMAP, F_VALUE,
BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId, TagValue, ValueClass, F_BITMAP,
F_VALUE,
},
BlobClass,
BlobClass, Serialize,
};
use utils::map::vec_map::VecMap;
@ -416,8 +417,11 @@ impl JMAP {
.value(Property::Keywords, keywords, F_VALUE | F_BITMAP)
.value(Property::Cid, change_id, F_VALUE)
.set(
ValueClass::IndexEmail(self.generate_snowflake_id()?),
metadata.blob_hash.as_ref(),
ValueClass::IndexEmail(IndexEmailClass::Insert {
seq: self.generate_snowflake_id()?,
hash: metadata.blob_hash.clone(),
}),
0u64.serialize(),
)
.custom(EmailIndexBuilder::set(metadata));

View file

@ -40,8 +40,8 @@ use store::{
query::Filter,
write::{
log::{ChangeLogBuilder, Changes, LogInsert},
now, AssignedIds, BatchBuilder, BitmapClass, MaybeDynamicId, MaybeDynamicValue,
SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
now, AssignedIds, BatchBuilder, BitmapClass, IndexEmailClass, MaybeDynamicId,
MaybeDynamicValue, SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
BitmapKey, BlobClass, Serialize,
};
@ -340,11 +340,13 @@ impl JMAP {
.set(Property::ThreadId, maybe_thread_id)
.tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0)
.set(
ValueClass::IndexEmail(
self.generate_snowflake_id()
ValueClass::IndexEmail(IndexEmailClass::Insert {
seq: self
.generate_snowflake_id()
.map_err(|_| IngestError::Temporary)?,
),
blob_id.hash.as_ref(),
hash: blob_id.hash.clone(),
}),
0u64.serialize(),
);
// Insert and obtain ids

View file

@ -54,7 +54,7 @@ use store::{
ahash::AHashSet,
write::{
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, Bincode, DeserializeFrom,
SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
IndexEmailClass, SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
Serialize,
};
@ -1091,8 +1091,10 @@ impl JMAP {
.with_collection(Collection::Email)
.delete_document(document_id)
.set(
ValueClass::IndexEmail(self.generate_snowflake_id()?),
vec![],
ValueClass::IndexEmail(IndexEmailClass::Delete {
seq: self.generate_snowflake_id()?,
}),
0u64.serialize(),
);
// Remove last changeId

View file

@ -23,10 +23,15 @@
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
ahash::AHashSet,
fts::index::FtsDocument,
write::{key::DeserializeBigEndian, BatchBuilder, Bincode, ValueClass},
Deserialize, IterateParams, ValueKey, U32_LEN, U64_LEN,
write::{
key::DeserializeBigEndian, now, BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId,
ValueClass,
},
Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN,
};
use utils::{BlobHash, BLOB_HASH_LEN};
use crate::{
email::{index::IndexMessageText, metadata::MessageMetadata},
@ -40,26 +45,31 @@ struct IndexEmail {
account_id: u32,
document_id: u32,
seq: u64,
lock_expiry: u64,
insert_hash: Option<BlobHash>,
}
const INDEX_LOCK_EXPIRY: u64 = 60 * 5;
impl JMAP {
pub async fn fts_index_queued(&self) {
let from_key = ValueKey::<ValueClass<u32>> {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::IndexEmail(0),
class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: 0 }),
};
let to_key = ValueKey::<ValueClass<u32>> {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::IndexEmail(u64::MAX),
class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: u64::MAX }),
};
// Retrieve entries pending to be indexed
// TODO: Support indexing from multiple nodes
let mut entries = Vec::new();
let mut skipped_documents = AHashSet::new();
let now = now();
let _ = self
.core
.storage
@ -67,7 +77,32 @@ impl JMAP {
.iterate(
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
entries.push((IndexEmail::deserialize(key)?, value.to_vec()));
let event = IndexEmail::deserialize(key, value)?;
if event.lock_expiry < now {
if !skipped_documents.contains(&(event.account_id, event.document_id)) {
entries.push(event);
} else {
tracing::trace!(
context = "queue",
event = "skipped",
account_id = event.account_id,
document_id = event.document_id,
"DocumentId already locked by another process."
);
}
} else {
skipped_documents.insert((event.account_id, event.document_id));
tracing::trace!(
context = "queue",
event = "locked",
account_id = event.account_id,
document_id = event.document_id,
expiry = event.lock_expiry - now,
"Index event locked by another process."
);
}
Ok(true)
},
)
@ -82,19 +117,24 @@ impl JMAP {
});
// Index entries
for (key, blob_hash) in entries {
if !blob_hash.is_empty() {
for event in entries {
// Lock index
if !self.try_lock_index(&event).await {
continue;
}
if let Some(hash) = &event.insert_hash {
match self
.get_property::<Bincode<MessageMetadata>>(
key.account_id,
event.account_id,
Collection::Email,
key.document_id,
event.document_id,
Property::BodyStructure,
)
.await
{
Ok(Some(metadata))
if metadata.inner.blob_hash.as_slice() == blob_hash.as_slice() =>
if metadata.inner.blob_hash.as_slice() == hash.as_slice() =>
{
// Obtain raw message
let raw_message = if let Ok(Some(raw_message)) = self
@ -106,8 +146,8 @@ impl JMAP {
tracing::warn!(
context = "fts_index_queued",
event = "error",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
blob_hash = ?metadata.inner.blob_hash,
"Message blob not found"
);
@ -118,16 +158,16 @@ impl JMAP {
// Index message
let document =
FtsDocument::with_default_language(self.core.jmap.default_language)
.with_account_id(key.account_id)
.with_account_id(event.account_id)
.with_collection(Collection::Email)
.with_document_id(key.document_id)
.with_document_id(event.document_id)
.index_message(&message);
if let Err(err) = self.core.storage.fts.index(document).await {
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to index email in FTS index"
);
@ -137,8 +177,8 @@ impl JMAP {
tracing::debug!(
context = "fts_index_queued",
event = "index",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
"Indexed document in FTS index"
);
}
@ -147,8 +187,8 @@ impl JMAP {
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to retrieve email metadata"
);
@ -159,8 +199,8 @@ impl JMAP {
tracing::debug!(
context = "fts_index_queued",
event = "error",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
"Email metadata not found"
);
}
@ -170,14 +210,18 @@ impl JMAP {
.core
.storage
.fts
.remove(key.account_id, Collection::Email.into(), key.document_id)
.remove(
event.account_id,
Collection::Email.into(),
event.document_id,
)
.await
{
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to remove document from FTS index"
);
@ -187,8 +231,8 @@ impl JMAP {
tracing::debug!(
context = "fts_index_queued",
event = "delete",
account_id = key.account_id,
document_id = key.document_id,
account_id = event.account_id,
document_id = event.document_id,
"Deleted document from FTS index"
);
}
@ -200,10 +244,10 @@ impl JMAP {
.data
.write(
BatchBuilder::new()
.with_account_id(key.account_id)
.with_account_id(event.account_id)
.with_collection(Collection::Email)
.update_document(key.document_id)
.clear(ValueClass::IndexEmail(key.seq))
.update_document(event.document_id)
.clear(event.value_class())
.build_batch(),
)
.await
@ -222,15 +266,59 @@ impl JMAP {
tracing::warn!("Failed to send index done event to housekeeper: {}", err);
}
}
async fn try_lock_index(&self, event: &IndexEmail) -> bool {
let mut batch = BatchBuilder::new();
batch
.with_account_id(event.account_id)
.update_document(event.document_id)
.assert_value(event.value_class(), event.lock_expiry)
.set(event.value_class(), (now() + INDEX_LOCK_EXPIRY).serialize());
match self.core.storage.data.write(batch.build()).await {
Ok(_) => true,
Err(store::Error::AssertValueFailed) => {
tracing::trace!(
context = "queue",
event = "locked",
account_id = event.account_id,
document_id = event.document_id,
"Failed to lock index: Index already locked."
);
false
}
Err(err) => {
tracing::error!(
context = "queue",
event = "error",
"Failed to lock index: {}",
err
);
false
}
}
}
}
impl Deserialize for IndexEmail {
fn deserialize(bytes: &[u8]) -> store::Result<Self> {
let len = bytes.len();
impl IndexEmail {
fn value_class(&self) -> ValueClass<MaybeDynamicId> {
match &self.insert_hash {
Some(hash) => ValueClass::IndexEmail(IndexEmailClass::Insert {
hash: hash.clone(),
seq: self.seq,
}),
None => ValueClass::IndexEmail(IndexEmailClass::Delete { seq: self.seq }),
}
}
fn deserialize(key: &[u8], value: &[u8]) -> store::Result<Self> {
Ok(IndexEmail {
seq: bytes.deserialize_be_u64(len - U64_LEN - (U32_LEN * 2))?,
account_id: bytes.deserialize_be_u32(len - U32_LEN * 2)?,
document_id: bytes.deserialize_be_u32(len - U32_LEN)?,
seq: key.deserialize_be_u64(0)?,
account_id: key.deserialize_be_u32(U64_LEN)?,
document_id: key.deserialize_be_u32(U64_LEN + U32_LEN)?,
lock_expiry: u64::deserialize(value)?,
insert_hash: key
.get(U64_LEN + U32_LEN + U32_LEN..U64_LEN + U32_LEN + U32_LEN + BLOB_HASH_LEN)
.and_then(|bytes| BlobHash::try_from_hash_slice(bytes).ok()),
})
}
}

View file

@ -101,7 +101,7 @@ impl SMTP {
if event.lock_expiry < now {
events.push(event);
} else {
tracing::debug!(
tracing::trace!(
context = "queue",
event = "locked",
id = event.queue_id,

View file

@ -34,8 +34,8 @@ use crate::{
};
use super::{
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass,
ReportEvent, ResolveId, TagValue, ValueClass,
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, IndexEmailClass, LookupClass,
QueueClass, ReportClass, ReportEvent, ResolveId, TagValue, ValueClass,
};
pub struct KeySerializer {
@ -271,9 +271,16 @@ impl<T: ResolveId> ValueClass<T> {
.write(account_id)
.write(collection)
.write(document_id),
ValueClass::IndexEmail(seq) => {
serializer.write(*seq).write(account_id).write(document_id)
}
ValueClass::IndexEmail(index) => match index {
IndexEmailClass::Insert { seq, hash } => serializer
.write(*seq)
.write(account_id)
.write(document_id)
.write::<&[u8]>(hash.as_ref()),
IndexEmailClass::Delete { seq } => {
serializer.write(*seq).write(account_id).write(document_id)
}
},
ValueClass::Blob(op) => match op {
BlobOp::Reserve { hash, until } => serializer
.write(account_id)
@ -527,7 +534,7 @@ impl<T> ValueClass<T> {
BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1,
BlobOp::Commit { .. } | BlobOp::Link { .. } => BLOB_HASH_LEN + U32_LEN * 2 + 2,
},
ValueClass::IndexEmail { .. } => U64_LEN * 2,
ValueClass::IndexEmail { .. } => BLOB_HASH_LEN + U64_LEN * 2,
ValueClass::Queue(q) => match q {
QueueClass::Message(_) => U64_LEN,
QueueClass::MessageEvent(_) => U64_LEN * 2,
@ -554,7 +561,7 @@ impl<T> ValueClass<T> {
}
ValueClass::TermIndex => SUBSPACE_TERM_INDEX,
ValueClass::Acl(_) => SUBSPACE_ACL,
ValueClass::IndexEmail(_) => SUBSPACE_FTS_INDEX,
ValueClass::IndexEmail { .. } => SUBSPACE_FTS_INDEX,
ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE,
BlobOp::Commit { .. } | BlobOp::Link { .. } => SUBSPACE_BLOB_LINK,

View file

@ -167,12 +167,18 @@ pub enum ValueClass<T> {
TermIndex,
Directory(DirectoryClass<T>),
Blob(BlobOp),
IndexEmail(u64),
IndexEmail(IndexEmailClass),
Config(Vec<u8>),
Queue(QueueClass),
Report(ReportClass),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum IndexEmailClass {
Insert { seq: u64, hash: BlobHash },
Delete { seq: u64 },
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum LookupClass {
Key(Vec<u8>),