Iterate values rather than sending multiple get requests

This commit is contained in:
mdecimus 2024-03-05 10:35:16 +01:00
parent ff279b3a39
commit 48f255b31f
18 changed files with 260 additions and 212 deletions

View file

@ -133,36 +133,29 @@ impl<T: SessionStream> SessionData<T> {
// Fetch mailboxes
let mut mailboxes = Vec::with_capacity(10);
for mailbox_id in mailbox_ids {
mailboxes.push(
match self
.jmap
.get_property::<Object<Value>>(
account_id,
Collection::Mailbox,
mailbox_id,
&Property::Value,
)
.await
.map_err(|_| {})?
{
Some(values) => (
mailbox_id,
values
.properties
.get(&Property::ParentId)
.map(|parent_id| match parent_id {
Value::Id(value) => value.document_id(),
_ => 0,
})
.unwrap_or(0),
values,
),
None => {
continue;
}
},
);
for (mailbox_id, values) in self
.jmap
.get_properties::<Object<Value>, _, _>(
account_id,
Collection::Mailbox,
&mailbox_ids,
Property::Value,
)
.await
.map_err(|_| {})?
{
mailboxes.push((
mailbox_id,
values
.properties
.get(&Property::ParentId)
.map(|parent_id| match parent_id {
Value::Id(value) => value.document_id(),
_ => 0,
})
.unwrap_or(0),
values,
));
}
// Build tree

View file

@ -103,38 +103,35 @@ impl<T: SessionStream> SessionData<T> {
let mut unassigned = Vec::new();
// Obtain all message ids
for (uid_mailbox, message_id) in self
for (message_id, uid_mailbox) in self
.jmap
.get_properties::<HashedValue<Vec<UidMailbox>>>(
.get_properties::<HashedValue<Vec<UidMailbox>>, _, _>(
mailbox.account_id,
Collection::Email,
message_ids.iter(),
&message_ids,
Property::MailboxIds,
)
.await?
.into_iter()
.zip(message_ids.iter())
{
// Make sure the message is still in this mailbox
if let Some(uid_mailbox) = uid_mailbox {
if let Some(item) = uid_mailbox
.inner
.iter()
.find(|item| item.mailbox_id == mailbox.mailbox_id)
{
if item.uid > 0 {
if assigned.insert(item.uid, message_id).is_some() {
tracing::warn!(event = "error",
if let Some(item) = uid_mailbox
.inner
.iter()
.find(|item| item.mailbox_id == mailbox.mailbox_id)
{
if item.uid > 0 {
if assigned.insert(item.uid, message_id).is_some() {
tracing::warn!(event = "error",
context = "store",
account_id = mailbox.account_id,
collection = ?Collection::Mailbox,
mailbox_id = mailbox.mailbox_id,
message_id = message_id,
"Duplicate UID");
}
} else {
unassigned.push((message_id, uid_mailbox));
}
} else {
unassigned.push((message_id, uid_mailbox));
}
}
}

View file

@ -102,10 +102,8 @@ impl<T: SessionStream> SessionData<T> {
// Group messages by thread
let mut threads: AHashMap<u32, Vec<u32>> = AHashMap::new();
let state = mailbox.state.lock();
for (document_id, thread_id) in result_set.results.into_iter().zip(thread_ids) {
if let (Some(thread_id), Some((imap_id, _))) =
(thread_id, state.map_result_id(document_id, is_uid))
{
for (document_id, thread_id) in thread_ids {
if let Some((imap_id, _)) = state.map_result_id(document_id, is_uid) {
threads.entry(thread_id).or_default().push(imap_id);
}
}

View file

@ -21,15 +21,20 @@
* for more details.
*/
use jmap_proto::types::{collection::Collection, property::Property};
use store::{ahash::AHashMap, write::ValueClass, ValueKey};
use std::collections::HashMap;
use futures_util::TryFutureExt;
use jmap_proto::{
error::method::MethodError,
types::{collection::Collection, property::Property},
};
use utils::CachedItem;
use crate::JMAP;
#[derive(Debug, Default)]
pub struct Threads {
pub threads: AHashMap<u32, u32>,
pub threads: HashMap<u32, u32>,
pub modseq: Option<u64>,
}
@ -38,11 +43,19 @@ impl JMAP {
&self,
account_id: u32,
message_ids: impl Iterator<Item = u32>,
) -> store::Result<Vec<Option<u32>>> {
) -> Result<Vec<(u32, u32)>, MethodError> {
// Obtain current state
let modseq = self
.store
.get_last_change_id(account_id, Collection::Thread)
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
error = ?err,
"Failed to retrieve threads last change id");
MethodError::ServerPartialFail
})
.await?;
// Lock the cache
@ -53,32 +66,22 @@ impl JMAP {
let mut thread_cache = thread_cache_.get().await;
// Invalidate cache if the modseq has changed
if thread_cache.modseq != modseq {
thread_cache.threads.clear();
if thread_cache.modseq.unwrap_or(0) < modseq.unwrap_or(0) {
thread_cache.threads = self
.get_properties::<u32, _, _>(account_id, Collection::Email, &(), Property::ThreadId)
.await?
.into_iter()
.collect();
thread_cache.modseq = modseq;
}
// Obtain threadIds for matching messages
let mut thread_ids = Vec::with_capacity(message_ids.size_hint().0);
for document_id in message_ids {
if let Some(thread_id) = thread_cache.threads.get(&document_id) {
thread_ids.push((*thread_id).into());
} else if let Some(thread_id) = self
.store
.get_value::<u32>(ValueKey {
account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::ThreadId.into()),
})
.await?
{
thread_ids.push(thread_id.into());
thread_cache.threads.insert(document_id, thread_id);
} else {
thread_ids.push(None);
thread_ids.push((document_id, *thread_id));
}
}
thread_cache.modseq = modseq;
Ok(thread_ids)
}

View file

@ -121,9 +121,8 @@ impl JMAP {
MethodError::ServerPartialFail
})?
.into_iter()
.zip(document_ids)
.filter_map(|(thread_id, document_id)| {
Id::from_parts(thread_id?, document_id).into()
Id::from_parts(thread_id, document_id).into()
})
.collect()
};

View file

@ -449,14 +449,17 @@ impl JMAP {
})?;
if thread_ids.len() == 1 {
return Ok(thread_ids.into_iter().next().unwrap());
return Ok(thread_ids
.into_iter()
.next()
.map(|(_, thread_id)| thread_id));
}
// Find the most common threadId
let mut thread_counts = VecMap::<u32, u32>::with_capacity(thread_ids.len());
let mut thread_id = u32::MAX;
let mut thread_count = 0;
for thread_id_ in thread_ids.iter().flatten() {
for (_, thread_id_) in thread_ids.iter() {
let tc = thread_counts.get_mut_or_insert(*thread_id_);
*tc += 1;
if *tc > thread_count {
@ -494,7 +497,11 @@ impl JMAP {
// Move messages to the new threadId
batch.with_collection(Collection::Email);
for old_thread_id in thread_ids.into_iter().flatten().collect::<AHashSet<_>>() {
for old_thread_id in thread_ids
.into_iter()
.map(|(_, thread_id)| thread_id)
.collect::<AHashSet<_>>()
{
if thread_id != old_thread_id {
for document_id in self
.store

View file

@ -375,37 +375,34 @@ impl JMAP {
.get_tag(account_id, Collection::Email, Property::Keywords, keyword)
.await?
.unwrap_or_default();
if keyword_doc_ids.is_empty() {
return Ok(keyword_doc_ids);
}
let keyword_thread_ids = self
.get_cached_thread_ids(account_id, keyword_doc_ids.iter())
.await?;
let mut not_matched_ids = RoaringBitmap::new();
let mut matched_ids = RoaringBitmap::new();
for keyword_doc_id in &keyword_doc_ids {
for (keyword_doc_id, thread_id) in keyword_thread_ids {
if matched_ids.contains(keyword_doc_id) || not_matched_ids.contains(keyword_doc_id) {
continue;
}
if let Some(thread_id) = self
.get_property::<u32>(
account_id,
Collection::Email,
keyword_doc_id,
&Property::ThreadId,
)
if let Some(thread_doc_ids) = self
.get_tag(account_id, Collection::Email, Property::ThreadId, thread_id)
.await?
{
if let Some(thread_doc_ids) = self
.get_tag(account_id, Collection::Email, Property::ThreadId, thread_id)
.await?
{
let mut thread_tag_intersection = thread_doc_ids.clone();
thread_tag_intersection &= &keyword_doc_ids;
let mut thread_tag_intersection = thread_doc_ids.clone();
thread_tag_intersection &= &keyword_doc_ids;
if (match_all && thread_tag_intersection == thread_doc_ids)
|| (!match_all && !thread_tag_intersection.is_empty())
{
matched_ids |= &thread_doc_ids;
} else if !thread_tag_intersection.is_empty() {
not_matched_ids |= &thread_tag_intersection;
}
if (match_all && thread_tag_intersection == thread_doc_ids)
|| (!match_all && !thread_tag_intersection.is_empty())
{
matched_ids |= &thread_doc_ids;
} else if !thread_tag_intersection.is_empty() {
not_matched_ids |= &thread_tag_intersection;
}
}
}

View file

@ -49,8 +49,11 @@ use store::{
fts::FtsFilter,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
roaring::RoaringBitmap,
write::{BatchBuilder, BitmapClass, DirectoryClass, TagValue, ValueClass},
BitmapKey, BlobStore, Deserialize, FtsStore, LookupStore, Store, Stores, ValueKey,
write::{
key::DeserializeBigEndian, BatchBuilder, BitmapClass, DirectoryClass, TagValue, ValueClass,
},
BitmapKey, BlobStore, Deserialize, FtsStore, IterateParams, LookupStore, Store, Stores,
ValueKey, U32_LEN,
};
use tokio::sync::mpsc;
use utils::{
@ -451,44 +454,62 @@ impl JMAP {
}
}
pub async fn get_properties<U>(
pub async fn get_properties<U, I, P>(
&self,
account_id: u32,
collection: Collection,
document_ids: impl Iterator<Item = u32>,
property: impl AsRef<Property>,
) -> Result<Vec<Option<U>>, MethodError>
iterate: &I,
property: P,
) -> Result<Vec<(u32, U)>, MethodError>
where
I: PropertiesIterator + Send + Sync,
P: AsRef<Property>,
U: Deserialize + 'static,
{
let property = property.as_ref();
let property: u8 = property.as_ref().into();
let collection: u8 = collection.into();
let expected_results = iterate.len();
let mut results = Vec::with_capacity(expected_results);
match self
.store
.get_values::<U>(
document_ids
.map(|document_id| ValueKey {
self.store
.iterate(
IterateParams::new(
ValueKey {
account_id,
collection: collection.into(),
document_id,
class: ValueClass::Property(property.into()),
})
.collect(),
collection,
document_id: iterate.min(),
class: ValueClass::Property(property),
},
ValueKey {
account_id,
collection,
document_id: iterate.max(),
class: ValueClass::Property(property),
},
),
|key, value| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if iterate.contains(document_id) {
results.push((document_id, U::deserialize(value)?));
Ok(expected_results == 0 || results.len() < expected_results)
} else {
Ok(true)
}
},
)
.await
{
Ok(value) => Ok(value),
Err(err) => {
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = ?collection,
property = ?property,
error = ?err,
"Failed to retrieve properties");
Err(MethodError::ServerPartialFail)
}
}
context = "store",
account_id = account_id,
collection = ?collection,
property = ?property,
error = ?err,
"Failed to retrieve properties");
MethodError::ServerPartialFail
})?;
Ok(results)
}
pub async fn get_document_ids(
@ -770,3 +791,47 @@ impl UpdateResults for QueryResponse {
}
}
}
#[allow(clippy::len_without_is_empty)]
pub trait PropertiesIterator {
fn min(&self) -> u32;
fn max(&self) -> u32;
fn contains(&self, id: u32) -> bool;
fn len(&self) -> usize;
}
impl PropertiesIterator for RoaringBitmap {
fn min(&self) -> u32 {
self.min().unwrap_or(0)
}
fn max(&self) -> u32 {
self.max().map(|m| m + 1).unwrap_or(0)
}
fn contains(&self, id: u32) -> bool {
self.contains(id)
}
fn len(&self) -> usize {
self.len() as usize
}
}
impl PropertiesIterator for () {
fn min(&self) -> u32 {
0
}
fn max(&self) -> u32 {
u32::MAX
}
fn contains(&self, _: u32) -> bool {
true
}
fn len(&self) -> usize {
0
}
}

View file

@ -272,8 +272,7 @@ impl JMAP {
MethodError::ServerPartialFail
})?
.into_iter()
.flatten()
.for_each(|thread_id| {
.for_each(|(_, thread_id)| {
thread_ids.insert(thread_id);
});
Ok(thread_ids.len())

View file

@ -119,20 +119,19 @@ impl JMAP {
&& (paginate.is_some()
|| (response.total.map_or(false, |total| total > 0) && filter_as_tree))
{
for document_id in mailbox_ids {
let parent_id = self
.get_property::<Object<Value>>(
account_id,
Collection::Mailbox,
document_id,
Property::Value,
)
.await?
.and_then(|o| {
o.properties
.get(&Property::ParentId)
.and_then(|id| id.as_id().map(|id| id.document_id()))
})
for (document_id, value) in self
.get_properties::<Object<Value>, _, _>(
account_id,
Collection::Mailbox,
&mailbox_ids,
Property::Value,
)
.await?
{
let parent_id = value
.properties
.get(&Property::ParentId)
.and_then(|id| id.as_id().map(|id| id.document_id()))
.unwrap_or(0);
hierarchy.insert(document_id + 1, parent_id);
tree.entry(parent_id)

View file

@ -32,7 +32,8 @@ jemallocator = "0.5.0"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["foundationdb", "postgres"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]

View file

@ -74,12 +74,7 @@ impl PostgresStore {
pub(super) async fn create_tables(&self) -> crate::Result<()> {
let conn = self.conn_pool.get().await?;
for table in [
SUBSPACE_VALUES,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
SUBSPACE_BITMAPS,
] {
for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] {
let table = char::from(table);
conn.execute(
&format!(
@ -93,7 +88,7 @@ impl PostgresStore {
.await?;
}
for table in [SUBSPACE_INDEXES] {
for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
let table = char::from(table);
conn.execute(
&format!(

View file

@ -56,19 +56,6 @@ impl Store {
}
}
pub async fn get_values<U>(&self, key: Vec<impl Key>) -> crate::Result<Vec<Option<U>>>
where
U: Deserialize + 'static,
{
let mut results = Vec::with_capacity(key.len());
for key in key {
results.push(self.get_value(key).await?);
}
Ok(results)
}
pub async fn get_bitmap(
&self,
key: BitmapKey<BitmapClass>,

View file

@ -244,8 +244,8 @@ impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
.write(0u8)
.write(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.write(*field),
.write(*field)
.write(self.document_id),
ValueClass::TermIndex => serializer
.write(1u8)
.write(self.account_id)

View file

@ -28,7 +28,10 @@ use std::{borrow::Borrow, io::Write};
pub trait Leb128_ {
fn to_leb128_writer(self, out: &mut impl Write) -> std::io::Result<usize>;
fn to_leb128_bytes(self, out: &mut Vec<u8>);
fn from_leb128_bytes(slice: &[u8]) -> Option<(Self, usize)>
fn from_leb128_bytes_pos(slice: &[u8]) -> Option<(Self, usize)>
where
Self: std::marker::Sized;
fn from_leb128_bytes(slice: &[u8]) -> Option<Self>
where
Self: std::marker::Sized;
fn from_leb128_it<T, I>(it: T) -> Option<Self>
@ -79,7 +82,7 @@ where
pub trait Leb128Reader: AsRef<[u8]> {
#[inline(always)]
fn read_leb128<T: Leb128_>(&self) -> Option<(T, usize)> {
T::from_leb128_bytes(self.as_ref())
T::from_leb128_bytes_pos(self.as_ref())
}
#[inline(always)]
@ -133,7 +136,7 @@ macro_rules! impl_unsigned_leb128 {
}
#[inline(always)]
fn from_leb128_bytes(slice: &[u8]) -> Option<($int_ty, usize)> {
fn from_leb128_bytes_pos(slice: &[u8]) -> Option<($int_ty, usize)> {
let mut result = 0;
for (shift, (pos, &byte)) in $shifts.into_iter().zip(slice.iter().enumerate()) {
@ -148,6 +151,22 @@ macro_rules! impl_unsigned_leb128 {
None
}
#[inline(always)]
fn from_leb128_bytes(slice: &[u8]) -> Option<$int_ty> {
let mut result = 0;
for (shift, &byte) in $shifts.into_iter().zip(slice.iter()) {
if (byte & 0x80) == 0 {
result |= (byte as $int_ty) << shift;
return Some(result);
} else {
result |= ((byte & 0x7F) as $int_ty) << shift;
}
}
None
}
#[inline(always)]
fn from_leb128_it<T, I>(it: T) -> Option<$int_ty>
where

View file

@ -6,7 +6,8 @@ resolver = "2"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["foundationdb", "postgres"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]

View file

@ -56,7 +56,6 @@ pub async fn test(db: Store) {
BatchBuilder::new()
.with_account_id(0)
.with_collection(0)
.with_account_id(0)
.update_document(0)
.set(ValueClass::Property(1), value.as_slice())
.set(ValueClass::Property(0), "check1")
@ -86,7 +85,6 @@ pub async fn test(db: Store) {
BatchBuilder::new()
.with_account_id(0)
.with_collection(0)
.with_account_id(0)
.update_document(0)
.clear(ValueClass::Property(1))
.build_batch(),

View file

@ -459,26 +459,21 @@ pub async fn test_filter(db: Store, fts: FtsStore) {
.await
.unwrap();
assert_eq!(
db.get_values::<String>(
sorted_docset
.ids
.into_iter()
.map(|document_id| ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id: document_id as u32,
class: ValueClass::Property(fields_u8["accession_number"])
})
.collect()
)
.await
.unwrap()
.into_iter()
.flatten()
.collect::<Vec<_>>(),
expected_results
);
let mut results = Vec::new();
for document_id in sorted_docset.ids {
results.push(
db.get_value::<String>(ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id: document_id as u32,
class: ValueClass::Property(fields_u8["accession_number"]),
})
.await
.unwrap()
.unwrap(),
);
}
assert_eq!(results, expected_results);
}
}
@ -554,25 +549,20 @@ pub async fn test_sort(db: Store) {
.await
.unwrap();
assert_eq!(
db.get_values::<String>(
sorted_docset
.ids
.into_iter()
.map(|document_id| ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id: document_id as u32,
class: ValueClass::Property(fields["accession_number"])
})
.collect()
)
.await
.unwrap()
.into_iter()
.flatten()
.collect::<Vec<_>>(),
expected_results
);
let mut results = Vec::new();
for document_id in sorted_docset.ids {
results.push(
db.get_value::<String>(ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id: document_id as u32,
class: ValueClass::Property(fields["accession_number"]),
})
.await
.unwrap()
.unwrap(),
);
}
assert_eq!(results, expected_results);
}
}