Store keys refactoring

This commit is contained in:
mdecimus 2023-11-09 16:21:39 +01:00
parent fe5a9e8ce8
commit 0e25f3d12c
41 changed files with 1191 additions and 1020 deletions

View file

@ -36,7 +36,6 @@ use jmap_proto::types::{collection::Collection, id::Id, keyword::Keyword, proper
use mail_parser::HeaderName;
use nlp::language::Language;
use store::{
fts::builder::MAX_TOKEN_LENGTH,
query::{
self,
log::Query,

View file

@ -34,7 +34,7 @@ use imap_proto::{
};
use jmap_proto::types::{collection::Collection, property::Property};
use store::{StoreRead, ValueKey};
use store::{write::ValueClass, StoreRead, ValueKey};
use tokio::io::AsyncRead;
use crate::core::{SelectedMailbox, Session, SessionData};
@ -92,13 +92,11 @@ impl SessionData {
result_set
.results
.iter()
.map(|document_id| {
ValueKey::new(
mailbox.id.account_id,
Collection::Email,
document_id,
Property::ThreadId,
)
.map(|document_id| ValueKey {
account_id: mailbox.id.account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::ThreadId.into()),
})
.collect(),
)

View file

@ -24,9 +24,11 @@
use std::{borrow::Cow, collections::HashSet};
use store::{
fts::builder::ToTokens,
write::{assert::HashedValue, BatchBuilder, BitmapFamily, IntoOperations, Operation},
Serialize, HASH_EXACT,
write::{
assert::HashedValue, BatchBuilder, BitmapClass, IntoOperations, Operation, TagValue,
TokenizeText, ValueClass, ValueOp,
},
Serialize,
};
use crate::{
@ -58,7 +60,6 @@ pub enum IndexAs {
LongInteger,
HasProperty,
Acl,
Quota,
#[default]
None,
}
@ -138,6 +139,14 @@ impl ObjectIndexBuilder {
Ok(self)
}
pub fn changes(&self) -> Option<&Object<Value>> {
self.changes.as_ref()
}
pub fn current(&self) -> Option<&HashedValue<Object<Value>>> {
self.current.as_ref()
}
}
impl IntoOperations for ObjectIndexBuilder {
@ -146,10 +155,7 @@ impl IntoOperations for ObjectIndexBuilder {
(None, Some(changes)) => {
// Insertion
build_batch(batch, self.index, &changes, true);
batch.ops.push(Operation::Value {
class: Property::Value.into(),
set: changes.serialize().into(),
});
batch.set(Property::Value, changes.serialize());
}
(Some(current), Some(changes)) => {
// Update
@ -160,10 +166,7 @@ impl IntoOperations for ObjectIndexBuilder {
// Deletion
batch.assert_value(Property::Value, &current);
build_batch(batch, self.index, &current.inner, false);
batch.ops.push(Operation::Value {
class: Property::Value.into(),
set: None,
});
batch.clear(Property::Value);
}
(None, None) => unreachable!(),
}
@ -202,7 +205,7 @@ fn merge_batch(
});
}
if tokenize {
remove_tokens = text.to_tokens();
text.tokenize_into(&mut remove_tokens);
}
}
@ -225,14 +228,16 @@ fn merge_batch(
}
// Update tokens
let field: u8 = property.clone().into();
for (token, set) in [(add_tokens, true), (remove_tokens, false)] {
for token in token {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
property.clone().into(),
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
token: token.into(),
},
set,
));
});
}
}
}
@ -250,7 +255,7 @@ fn merge_batch(
remove_values.insert(text);
}
if tokenize {
remove_tokens.extend(text.to_tokens());
text.tokenize_into(&mut remove_tokens);
}
}
}
@ -286,14 +291,16 @@ fn merge_batch(
}
// Update tokens
let field: u8 = property.clone().into();
for (token, set) in [(add_tokens, true), (remove_tokens, false)] {
for token in token {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
property.clone().into(),
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
token: token.into_bytes(),
},
set,
));
});
}
}
}
@ -347,16 +354,18 @@ fn merge_batch(
IndexAs::HasProperty => {
if current_value == &Value::Null {
batch.ops.push(Operation::Bitmap {
family: ().family(),
field: property.clone().into(),
key: vec![],
class: BitmapClass::Tag {
field: property.clone().into(),
value: ().into(),
},
set: true,
});
} else if value == Value::Null {
batch.ops.push(Operation::Bitmap {
family: ().family(),
field: property.clone().into(),
key: vec![],
class: BitmapClass::Tag {
field: property.clone().into(),
value: ().into(),
},
set: false,
});
}
@ -426,14 +435,6 @@ fn merge_batch(
_ => {}
}
}
IndexAs::Quota => {
if let Some(current_value) = current_value.try_cast_uint() {
batch.quota(-(current_value as i64));
}
if let Some(value) = value.try_cast_uint() {
batch.quota(value as i64);
}
}
IndexAs::None => (),
}
}
@ -448,7 +449,7 @@ fn merge_batch(
if has_changes {
batch.ops.push(Operation::Value {
class: Property::Value.into(),
set: current.serialize().into(),
op: ValueOp::Set(current.serialize()),
});
}
}
@ -470,13 +471,15 @@ fn build_batch(
});
}
if tokenize {
for token in text.to_tokens() {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
(&item.property).into(),
let field: u8 = (&item.property).into();
for token in text.as_str().to_tokens() {
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
token: token.into_bytes(),
},
set,
));
});
}
}
}
@ -493,20 +496,22 @@ fn build_batch(
}
}
}
let field: u8 = (&item.property).into();
for text in indexes {
batch.ops.push(Operation::Index {
field: (&item.property).into(),
field,
key: text.serialize(),
set,
});
}
for token in tokens {
batch.ops.push(Operation::hash(
&token,
HASH_EXACT,
(&item.property).into(),
batch.ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
token: token.into_bytes(),
},
set,
));
});
}
}
(Value::UnsignedInt(integer), IndexAs::Integer | IndexAs::LongInteger) => {
@ -559,16 +564,12 @@ fn build_batch(
}
}
}
(Value::UnsignedInt(bytes), IndexAs::Quota) => {
batch.ops.push(Operation::UpdateQuota {
bytes: if set { *bytes as i64 } else { -(*bytes as i64) },
});
}
(value, IndexAs::HasProperty) if value != &Value::Null => {
batch.ops.push(Operation::Bitmap {
family: ().family(),
field: (&item.property).into(),
key: vec![],
class: BitmapClass::Tag {
field: (&item.property).into(),
value: ().into(),
},
set,
});
}
@ -636,3 +637,18 @@ impl IntoIndex for &Id {
}
}
}
impl From<Property> for ValueClass {
fn from(value: Property) -> Self {
ValueClass::Property(value.into())
}
}
impl From<Property> for BitmapClass {
fn from(value: Property) -> Self {
BitmapClass::Tag {
field: value.into(),
value: TagValue::Static(0),
}
}
}

View file

@ -31,7 +31,7 @@ pub mod sieve;
use std::slice::Iter;
use store::{
write::{DeserializeFrom, SerializeInto, ToBitmaps, ValueClass},
write::{DeserializeFrom, SerializeInto, ToBitmaps},
Deserialize, Serialize,
};
use utils::{
@ -262,12 +262,3 @@ impl DeserializeFrom for Value {
}
}
}
impl From<Property> for ValueClass {
fn from(value: Property) -> Self {
ValueClass::Property {
field: value.into(),
family: 0,
}
}
}

View file

@ -24,8 +24,8 @@
use std::fmt::Display;
use store::{
write::{BitmapFamily, DeserializeFrom, Operation, SerializeInto, ToBitmaps},
Serialize, BM_TAG, TAG_STATIC, TAG_TEXT,
write::{BitmapClass, DeserializeFrom, Operation, SerializeInto, TagValue, ToBitmaps},
Serialize,
};
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
@ -181,22 +181,13 @@ impl Display for Keyword {
}
}
impl BitmapFamily for Keyword {
fn family(&self) -> u8 {
if matches!(self, Keyword::Other(_)) {
BM_TAG | TAG_TEXT
} else {
BM_TAG | TAG_STATIC
}
}
}
impl ToBitmaps for Keyword {
fn to_bitmaps(&self, ops: &mut Vec<store::write::Operation>, field: u8, set: bool) {
ops.push(Operation::Bitmap {
family: self.family(),
field,
key: self.serialize(),
class: BitmapClass::Tag {
field,
value: self.into(),
},
set,
});
}
@ -293,3 +284,43 @@ impl DeserializeFrom for Keyword {
}
}
}
impl From<Keyword> for TagValue {
fn from(value: Keyword) -> Self {
match value {
Keyword::Seen => TagValue::Static(SEEN as u8),
Keyword::Draft => TagValue::Static(DRAFT as u8),
Keyword::Flagged => TagValue::Static(FLAGGED as u8),
Keyword::Answered => TagValue::Static(ANSWERED as u8),
Keyword::Recent => TagValue::Static(RECENT as u8),
Keyword::Important => TagValue::Static(IMPORTANT as u8),
Keyword::Phishing => TagValue::Static(PHISHING as u8),
Keyword::Junk => TagValue::Static(JUNK as u8),
Keyword::NotJunk => TagValue::Static(NOTJUNK as u8),
Keyword::Deleted => TagValue::Static(DELETED as u8),
Keyword::Forwarded => TagValue::Static(FORWARDED as u8),
Keyword::MdnSent => TagValue::Static(MDN_SENT as u8),
Keyword::Other(string) => TagValue::Text(string.into_bytes()),
}
}
}
impl From<&Keyword> for TagValue {
fn from(value: &Keyword) -> Self {
match value {
Keyword::Seen => TagValue::Static(SEEN as u8),
Keyword::Draft => TagValue::Static(DRAFT as u8),
Keyword::Flagged => TagValue::Static(FLAGGED as u8),
Keyword::Answered => TagValue::Static(ANSWERED as u8),
Keyword::Recent => TagValue::Static(RECENT as u8),
Keyword::Important => TagValue::Static(IMPORTANT as u8),
Keyword::Phishing => TagValue::Static(PHISHING as u8),
Keyword::Junk => TagValue::Static(JUNK as u8),
Keyword::NotJunk => TagValue::Static(NOTJUNK as u8),
Keyword::Deleted => TagValue::Static(DELETED as u8),
Keyword::Forwarded => TagValue::Static(FORWARDED as u8),
Keyword::MdnSent => TagValue::Static(MDN_SENT as u8),
Keyword::Other(string) => TagValue::Text(string.as_bytes().to_vec()),
}
}
}

View file

@ -26,11 +26,11 @@ use jmap_proto::{
types::{collection::Collection, property::Property, value::Value},
};
use store::{
write::{assert::HashedValue, BatchBuilder, Operation, ValueClass},
write::{assert::HashedValue, BatchBuilder, ValueClass},
BitmapKey, Serialize, StorePurge, StoreRead, StoreWrite, ValueKey,
};
use crate::{auth::authenticate::AccountKey, mailbox::set::SCHEMA, JMAP};
use crate::{mailbox::set::SCHEMA, NamedKey, JMAP};
impl JMAP {
pub async fn delete_account(&self, account_name: &str, account_id: u32) -> store::Result<()> {
@ -42,18 +42,9 @@ impl JMAP {
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(account_name),
},
set: None,
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::id_to_name(account_id),
},
set: None,
})
.clear(NamedKey::Name(account_name))
.clear(NamedKey::Id::<&[u8]>(account_id))
.clear(NamedKey::Quota::<&[u8]>(account_id))
.with_account_id(account_id)
.with_collection(Collection::Mailbox);
for mailbox_id in self
@ -64,12 +55,12 @@ impl JMAP {
{
let mailbox = self
.store
.get_value::<HashedValue<Object<Value>>>(ValueKey::new(
.get_value::<HashedValue<Object<Value>>>(ValueKey {
account_id,
Collection::Mailbox,
mailbox_id,
Property::Value,
))
collection: Collection::Mailbox.into(),
document_id: mailbox_id,
class: ValueClass::Property(Property::Value.into()),
})
.await?
.ok_or_else(|| {
store::Error::InternalError(format!("Mailbox {} not found", mailbox_id))
@ -102,24 +93,12 @@ impl JMAP {
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(account_name),
},
set: None,
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(new_account_name),
},
set: account_id.serialize().into(),
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::id_to_name(account_id),
},
set: new_account_name.serialize().into(),
});
.clear(NamedKey::Name(account_name))
.set(
NamedKey::Id::<&[u8]>(account_id),
new_account_name.serialize(),
)
.set(NamedKey::Name(new_account_name), account_id.serialize());
self.store.write(batch.build()).await?;
Ok(())
}

View file

@ -33,8 +33,12 @@ use jmap_proto::{
};
use store::{
roaring::RoaringBitmap,
write::{assert::HashedValue, key::DeserializeBigEndian},
AclKey, Deserialize, Error, StoreRead,
write::{
assert::HashedValue,
key::{AclKey, DeserializeBigEndian},
ValueClass,
},
Deserialize, Error, StoreRead, ValueKey,
};
use utils::map::bitmap::{Bitmap, BitmapItem};
@ -48,17 +52,17 @@ impl JMAP {
.iter()
.chain(access_token.member_of.clone().iter())
{
let from_key = AclKey {
grant_account_id,
to_account_id: 0,
to_collection: 0,
to_document_id: 0,
let from_key = ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Acl(grant_account_id),
};
let to_key = AclKey {
grant_account_id,
to_account_id: u32::MAX,
to_collection: u8::MAX,
to_document_id: u32::MAX,
let to_key = ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::Acl(grant_account_id),
};
match self
.store
@ -132,14 +136,14 @@ impl JMAP {
.iter()
.chain(access_token.member_of.clone().iter())
{
let from_key = AclKey {
grant_account_id,
to_account_id,
to_collection,
to_document_id: 0,
let from_key = ValueKey {
account_id: to_account_id,
collection: to_collection,
document_id: 0,
class: ValueClass::Acl(grant_account_id),
};
let mut to_key = from_key;
to_key.to_document_id = u32::MAX;
let mut to_key = from_key.clone();
to_key.document_id = u32::MAX;
match self
.store
@ -258,11 +262,11 @@ impl JMAP {
{
match self
.store
.get_value::<u64>(AclKey {
grant_account_id,
to_account_id,
to_collection,
to_document_id,
.get_value::<u64>(ValueKey {
account_id: to_account_id,
collection: to_collection,
document_id: to_document_id,
class: ValueClass::Acl(grant_account_id),
})
.await
{

View file

@ -34,13 +34,10 @@ use jmap_proto::{
};
use mail_parser::decoders::base64::base64_decode;
use mail_send::Credentials;
use store::{
write::{key::KeySerializer, BatchBuilder, Operation, ValueClass},
CustomValueKey, Serialize, StoreRead, StoreWrite,
};
use store::{write::BatchBuilder, Serialize, StoreRead, StoreWrite};
use utils::{listener::limiter::InFlight, map::ttl_dashmap::TtlMap};
use crate::JMAP;
use crate::{NamedKey, JMAP};
use super::{rate_limit::RemoteAddress, AccessToken};
@ -155,9 +152,7 @@ impl JMAP {
pub async fn try_get_account_id(&self, name: &str) -> Result<Option<u32>, MethodError> {
self.store
.get_value::<u32>(CustomValueKey {
value: AccountKey::name_to_id(name),
})
.get_value::<u32>(NamedKey::Name(name))
.await
.map_err(|err| {
tracing::error!(event = "error",
@ -183,26 +178,15 @@ impl JMAP {
.assign_document_id(u32::MAX, Collection::Principal)
.await?;
// Serialize key
let key = AccountKey::name_to_id(name);
// Write account ID
let mut batch = BatchBuilder::new();
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.create_document(account_id)
.assert_value(ValueClass::Custom { bytes: key.clone() }, ())
.op(Operation::Value {
class: ValueClass::Custom { bytes: key },
set: account_id.serialize().into(),
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::id_to_name(account_id),
},
set: name.serialize().into(),
});
.assert_value(NamedKey::Name(name), ())
.set(NamedKey::Name(name), account_id.serialize())
.set(NamedKey::Id::<&[u8]>(account_id), name.serialize());
match self.store.write(batch.build()).await {
Ok(_) => {
@ -233,9 +217,7 @@ impl JMAP {
pub async fn get_account_name(&self, account_id: u32) -> Result<Option<String>, MethodError> {
self.store
.get_value::<String>(CustomValueKey {
value: AccountKey::id_to_name(account_id),
})
.get_value::<String>(NamedKey::Id::<&[u8]>(account_id))
.await
.map_err(|err| {
tracing::error!(event = "error",
@ -329,22 +311,3 @@ impl JMAP {
}
}
}
pub struct AccountKey();
impl AccountKey {
pub fn name_to_id(name: &str) -> Vec<u8> {
KeySerializer::new(name.len() + std::mem::size_of::<u32>() + 1)
.write(u32::MAX)
.write(0u8)
.write(name)
.finalize()
}
pub fn id_to_name(id: u32) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<u32>() * 2 + 1)
.write(u32::MAX)
.write(1u8)
.write(id)
.finalize()
}
}

View file

@ -48,8 +48,6 @@ use jmap_proto::{
};
use mail_parser::{parsers::fields::thread::thread_name, HeaderName, HeaderValue};
use store::{
fts::term_index::TokenIndex,
query::RawValue,
write::{BatchBuilder, F_BITMAP, F_VALUE},
BlobKind, StoreWrite,
};
@ -294,22 +292,16 @@ impl JMAP {
received_at: Option<UTCDate>,
) -> Result<Result<IngestedEmail, SetError>, MethodError> {
// Obtain term index and metadata
let (mut metadata, token_index) = if let (Some(metadata), Some(token_index)) = (
self.get_property::<Bincode<MessageMetadata>>(
let mut metadata = if let Some(metadata) = self
.get_property::<Bincode<MessageMetadata>>(
from_account_id,
Collection::Email,
from_message_id,
Property::BodyStructure,
)
.await?,
self.get_term_index::<RawValue<TokenIndex>>(
from_account_id,
Collection::Email,
from_message_id,
)
.await?,
) {
(metadata.inner, token_index)
.await?
{
metadata.inner
} else {
return Ok(Err(SetError::not_found().with_description(format!(
"Message not found not found in account {}.",
@ -432,7 +424,6 @@ impl JMAP {
.value(Property::Keywords, keywords, F_VALUE | F_BITMAP)
.value(Property::Cid, changes.change_id, F_VALUE)
.custom(EmailIndexBuilder::set(metadata))
.custom(token_index)
.custom(changes);
self.store.write(batch.build()).await.map_err(|err| {

View file

@ -31,12 +31,9 @@ use mail_parser::{
PartType,
};
use nlp::language::Language;
use store::{
fts::builder::{FtsIndexBuilder, MAX_TOKEN_LENGTH},
write::{BatchBuilder, IntoOperations, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE},
};
use store::write::{BatchBuilder, IntoOperations, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE};
use crate::Bincode;
use crate::{Bincode, NamedKey};
use super::metadata::MessageMetadata;
@ -83,8 +80,12 @@ impl IndexMessage for BatchBuilder {
self.value(Property::MailboxIds, mailbox_ids, F_VALUE | F_BITMAP);
// Index size
let account_id = self.last_account_id().unwrap();
self.value(Property::Size, message.raw_message.len() as u32, F_INDEX)
.quota(message.raw_message.len() as i64);
.add(
NamedKey::Quota::<&[u8]>(account_id),
message.raw_message.len() as i64,
);
// Index receivedAt
self.value(Property::ReceivedAt, received_at, F_INDEX);
@ -138,14 +139,9 @@ impl IndexMessage for BatchBuilder {
// Store and index hasAttachment property
if has_attachments {
self.bitmap(Property::HasAttachment, (), 0);
self.tag(Property::HasAttachment, (), 0);
}
// FTS index
let mut fts = FtsIndexBuilder::with_default_language(Language::English);
fts.index_message(&message);
self.custom(fts);
// Store message metadata
self.value(
Property::BodyStructure,
@ -257,6 +253,7 @@ impl IndexMessage for BatchBuilder {
}
}
/*
impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
fn index_message(&mut self, message: &'x Message<'x>) {
let mut language = Language::Unknown;
@ -379,6 +376,7 @@ impl<'x> IndexMessageText<'x> for FtsIndexBuilder<'x, Property> {
}
}
}
*/
pub struct EmailIndexBuilder<'x> {
inner: Bincode<MessageMetadata<'x>>,
@ -415,20 +413,24 @@ impl<'x> IntoOperations for EmailIndexBuilder<'x> {
let metadata = &self.inner.inner;
// Index properties
let account_id = batch.last_account_id().unwrap();
batch
.value(Property::Size, metadata.size as u32, F_INDEX | options)
.quota(if self.set {
metadata.size as i64
} else {
-(metadata.size as i64)
});
.add(
NamedKey::Quota::<&[u8]>(account_id),
if self.set {
metadata.size as i64
} else {
-(metadata.size as i64)
},
);
batch.value(
Property::ReceivedAt,
metadata.received_at,
F_INDEX | options,
);
if metadata.has_attachments {
batch.bitmap(Property::HasAttachment, (), options);
batch.tag(Property::HasAttachment, (), options);
}
batch.index_headers(&metadata.contents.parts[0].headers, options);
}

View file

@ -36,7 +36,10 @@ use mail_parser::{
use store::{
ahash::AHashSet,
query::{filter::StoreQuery, Filter},
write::{log::ChangeLogBuilder, now, BatchBuilder, F_BITMAP, F_CLEAR, F_VALUE},
write::{
log::ChangeLogBuilder, now, BatchBuilder, BitmapClass, TagValue, ValueClass, F_BITMAP,
F_CLEAR, F_VALUE,
},
BitmapKey, StoreId, StoreRead, StoreWrite, ValueKey,
};
use utils::map::vec_map::VecMap;
@ -380,13 +383,11 @@ impl JMAP {
.get_values::<u32>(
results
.iter()
.map(|document_id| {
ValueKey::new(
account_id,
Collection::Email,
document_id,
Property::ThreadId,
)
.map(|document_id| ValueKey {
account_id,
collection: Collection::Email.into(),
document_id,
class: ValueClass::Property(Property::ThreadId.into()),
})
.collect(),
)
@ -454,12 +455,15 @@ impl JMAP {
if thread_id != old_thread_id {
for document_id in self
.store
.get_bitmap(BitmapKey::value(
.get_bitmap(BitmapKey {
account_id,
Collection::Email,
Property::ThreadId,
old_thread_id,
))
collection: Collection::Email.into(),
class: BitmapClass::Tag {
field: Property::ThreadId.into(),
value: TagValue::Id(old_thread_id),
},
block_num: 0,
})
.await
.map_err(|err| {
tracing::error!(

View file

@ -27,12 +27,10 @@ use jmap_proto::{
object::email::QueryArguments,
types::{acl::Acl, collection::Collection, keyword::Keyword, property::Property},
};
use mail_parser::HeaderName;
use nlp::language::Language;
use store::{
fts::builder::MAX_TOKEN_LENGTH,
query::{self},
roaring::RoaringBitmap,
write::ValueClass,
ValueKey,
};
@ -323,12 +321,12 @@ impl JMAP {
result_set,
comparators,
paginate
.with_prefix_key(ValueKey::new(
.with_prefix_key(ValueKey {
account_id,
Collection::Email,
0,
Property::ThreadId,
))
collection: Collection::Email.into(),
document_id: 0,
class: ValueClass::Property(Property::ThreadId.into()),
})
.with_prefix_unique(request.arguments.collapse_threads.unwrap_or(false)),
response,
)

View file

@ -52,12 +52,11 @@ use mail_builder::{
use mail_parser::MessageParser;
use store::{
ahash::AHashSet,
fts::term_index::TokenIndex,
write::{
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, DeserializeFrom, SerializeInto,
ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
BlobKind, Serialize, StoreRead, StoreWrite, ValueKey,
BlobKind, Serialize, StoreWrite,
};
use crate::{auth::AccessToken, Bincode, IngestError, JMAP};
@ -1202,36 +1201,6 @@ impl JMAP {
return Ok(Err(SetError::not_found()));
};
// Delete term index
if let Some(token_index) = self
.store
.get_value::<TokenIndex>(ValueKey::term_index(
account_id,
Collection::Email,
document_id,
))
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "email_delete",
error = ?err,
"Failed to deserialize token index while deleting email.");
MethodError::ServerPartialFail
})?
{
batch.custom(token_index);
} else {
tracing::debug!(
event = "error",
context = "email_delete",
account_id = account_id,
document_id = document_id,
"Failed to fetch term index.",
);
return Ok(Err(SetError::not_found()));
}
// Delete threadId
if let Some(thread_id) = delete_thread_id {
batch
@ -1370,13 +1339,7 @@ impl<
let property = u8::from(property);
batch
.assert_value(
ValueClass::Property {
field: property,
family: 0,
},
&self.current,
)
.assert_value(ValueClass::Property(property), &self.current)
.value(property, self.current.inner, F_VALUE);
for added in self.added {
batch.value(property, added, F_BITMAP);

View file

@ -31,14 +31,7 @@ use jmap_proto::{
};
use mail_parser::{decoders::html::html_to_text, MessageParser, PartType};
use nlp::language::{stemmer::Stemmer, Language};
use store::{
fts::{
builder::MAX_TOKEN_LENGTH,
search_snippet::generate_snippet,
term_index::{self, TermIndex},
},
BlobKind,
};
use store::BlobKind;
use crate::{auth::AccessToken, JMAP};
@ -52,13 +45,13 @@ impl JMAP {
) -> Result<GetSearchSnippetResponse, MethodError> {
let mut filter_stack = vec![];
let mut include_term = true;
let mut terms = vec![];
//let mut terms = vec![];
let mut match_phrase = false;
for cond in request.filter {
match cond {
Filter::Text(text) | Filter::Subject(text) | Filter::Body(text) => {
if include_term {
/*if include_term {
let (text, language) = Language::detect(text, self.config.default_language);
if (text.starts_with('"') && text.ends_with('"'))
|| (text.starts_with('\'') && text.ends_with('\''))
@ -82,7 +75,7 @@ impl JMAP {
.collect::<Vec<_>>(),
);
}
}
}*/
}
Filter::And | Filter::Or => {
filter_stack.push(cond);
@ -114,145 +107,146 @@ impl JMAP {
return Err(MethodError::RequestTooLarge);
}
for email_id in email_ids {
let document_id = email_id.document_id();
let mut snippet = SearchSnippet {
email_id,
subject: None,
preview: None,
};
if !document_ids.contains(document_id) {
response.not_found.push(email_id);
continue;
} else if terms.is_empty() {
response.list.push(snippet);
continue;
}
// Obtain the term index and raw message
let (term_index, raw_message) = if let (Some(term_index), Some(raw_message)) = (
self.get_term_index::<TermIndex>(account_id, Collection::Email, document_id)
.await?,
self.get_blob(
&BlobKind::LinkedMaildir {
account_id,
document_id,
},
0..u32::MAX,
)
.await?,
) {
(term_index, raw_message)
} else {
response.not_found.push(email_id);
continue;
};
// Parse message
let message = if let Some(message) = MessageParser::new().parse(&raw_message) {
message
} else {
response.not_found.push(email_id);
continue;
};
// Build the match terms
let mut match_terms = Vec::new();
for term in &terms {
for (word, stemmed_word) in term {
match_terms.push(term_index.get_match_term(word, stemmed_word.as_deref()));
}
}
'outer: for term_group in term_index
.match_terms(&match_terms, None, match_phrase, true, true)
.map_err(|err| match err {
term_index::Error::InvalidArgument => {
MethodError::UnsupportedFilter("Too many search terms.".to_string())
/*
for email_id in email_ids {
let document_id = email_id.document_id();
let mut snippet = SearchSnippet {
email_id,
subject: None,
preview: None,
};
if !document_ids.contains(document_id) {
response.not_found.push(email_id);
continue;
} else if terms.is_empty() {
response.list.push(snippet);
continue;
}
err => {
tracing::error!(
account_id = account_id,
document_id = document_id,
reason = ?err,
"Failed to generate search snippet.");
MethodError::UnsupportedFilter(
"Failed to generate search snippet.".to_string(),
// Obtain the term index and raw message
let (term_index, raw_message) = if let (Some(term_index), Some(raw_message)) = (
self.get_term_index::<TermIndex>(account_id, Collection::Email, document_id)
.await?,
self.get_blob(
&BlobKind::LinkedMaildir {
account_id,
document_id,
},
0..u32::MAX,
)
}
})?
.unwrap_or_default()
{
if term_group.part_id == 0 {
// Generate subject snippent
snippet.subject =
generate_snippet(&term_group.terms, message.subject().unwrap_or_default());
} else {
let mut part_num = 1;
for part in &message.parts {
match &part.body {
PartType::Text(text) => {
if part_num == term_group.part_id {
snippet.preview = generate_snippet(&term_group.terms, text);
break 'outer;
} else {
part_num += 1;
}
}
PartType::Html(html) => {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, &html_to_text(html));
break 'outer;
} else {
part_num += 1;
}
}
PartType::Message(message) => {
if let Some(subject) = message.subject() {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, subject);
break 'outer;
} else {
part_num += 1;
}
}
for sub_part in message.parts.iter().take(MAX_MESSAGE_PARTS) {
match &sub_part.body {
PartType::Text(text) => {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, text);
break 'outer;
} else {
part_num += 1;
}
}
PartType::Html(html) => {
if part_num == term_group.part_id {
snippet.preview = generate_snippet(
&term_group.terms,
&html_to_text(html),
);
break 'outer;
} else {
part_num += 1;
}
}
_ => (),
}
}
}
_ => (),
.await?,
) {
(term_index, raw_message)
} else {
response.not_found.push(email_id);
continue;
};
// Parse message
let message = if let Some(message) = MessageParser::new().parse(&raw_message) {
message
} else {
response.not_found.push(email_id);
continue;
};
// Build the match terms
let mut match_terms = Vec::new();
for term in &terms {
for (word, stemmed_word) in term {
match_terms.push(term_index.get_match_term(word, stemmed_word.as_deref()));
}
}
'outer: for term_group in term_index
.match_terms(&match_terms, None, match_phrase, true, true)
.map_err(|err| match err {
term_index::Error::InvalidArgument => {
MethodError::UnsupportedFilter("Too many search terms.".to_string())
}
err => {
tracing::error!(
account_id = account_id,
document_id = document_id,
reason = ?err,
"Failed to generate search snippet.");
MethodError::UnsupportedFilter(
"Failed to generate search snippet.".to_string(),
)
}
})?
.unwrap_or_default()
{
if term_group.part_id == 0 {
// Generate subject snippent
snippet.subject =
generate_snippet(&term_group.terms, message.subject().unwrap_or_default());
} else {
let mut part_num = 1;
for part in &message.parts {
match &part.body {
PartType::Text(text) => {
if part_num == term_group.part_id {
snippet.preview = generate_snippet(&term_group.terms, text);
break 'outer;
} else {
part_num += 1;
}
}
PartType::Html(html) => {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, &html_to_text(html));
break 'outer;
} else {
part_num += 1;
}
}
PartType::Message(message) => {
if let Some(subject) = message.subject() {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, subject);
break 'outer;
} else {
part_num += 1;
}
}
for sub_part in message.parts.iter().take(MAX_MESSAGE_PARTS) {
match &sub_part.body {
PartType::Text(text) => {
if part_num == term_group.part_id {
snippet.preview =
generate_snippet(&term_group.terms, text);
break 'outer;
} else {
part_num += 1;
}
}
PartType::Html(html) => {
if part_num == term_group.part_id {
snippet.preview = generate_snippet(
&term_group.terms,
&html_to_text(html),
);
break 'outer;
} else {
part_num += 1;
}
}
_ => (),
}
}
}
_ => (),
}
}
}
}
response.list.push(snippet);
}
}
response.list.push(snippet);
}
*/
Ok(response)
}
}

View file

@ -56,8 +56,9 @@ use store::{
Comparator, Filter, ResultSet, SortedResultSet,
},
roaring::RoaringBitmap,
write::{BatchBuilder, BitmapFamily, ToBitmaps},
BitmapKey, Deserialize, Serialize, StoreId, StoreInit, StoreRead, StoreWrite, ValueKey,
write::{key::KeySerializer, BatchBuilder, BitmapClass, TagValue, ToBitmaps, ValueClass},
BitmapKey, Deserialize, Key, Serialize, StoreId, StoreInit, StoreRead, StoreWrite, ValueKey,
SUBSPACE_VALUES,
};
use tokio::sync::mpsc;
use utils::{
@ -425,7 +426,12 @@ impl JMAP {
let property = property.as_ref();
match self
.store
.get_value::<U>(ValueKey::new(account_id, collection, document_id, property))
.get_value::<U>(ValueKey {
account_id,
collection: collection.into(),
document_id,
class: ValueClass::Property(property.into()),
})
.await
{
Ok(value) => Ok(value),
@ -454,11 +460,17 @@ impl JMAP {
U: Deserialize + 'static,
{
let property = property.as_ref();
match self
.store
.get_values::<U>(
document_ids
.map(|document_id| ValueKey::new(account_id, collection, document_id, property))
.map(|document_id| ValueKey {
account_id,
collection: collection.into(),
document_id,
class: ValueClass::Property(property.into()),
})
.collect(),
)
.await
@ -477,37 +489,6 @@ impl JMAP {
}
}
pub async fn get_term_index<T: Deserialize + 'static>(
&self,
account_id: u32,
collection: Collection,
document_id: u32,
) -> Result<Option<T>, MethodError> {
match self
.store
.get_value::<T>(ValueKey {
account_id,
collection: collection.into(),
document_id,
family: u8::MAX,
field: u8::MAX,
})
.await
{
Ok(value) => Ok(value),
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_id = account_id,
collection = ?collection,
document_id = document_id,
error = ?err,
"Failed to retrieve term index");
Err(MethodError::ServerPartialFail)
}
}
}
pub async fn get_document_ids(
&self,
account_id: u32,
@ -536,12 +517,20 @@ impl JMAP {
account_id: u32,
collection: Collection,
property: impl AsRef<Property>,
value: impl BitmapFamily + Serialize,
value: impl Into<TagValue>,
) -> Result<Option<RoaringBitmap>, MethodError> {
let property = property.as_ref();
match self
.store
.get_bitmap(BitmapKey::value(account_id, collection, property, value))
.get_bitmap(BitmapKey {
account_id,
collection: collection.into(),
class: BitmapClass::Tag {
field: property.into(),
value: value.into(),
},
block_num: 0,
})
.await
{
Ok(value) => Ok(value),
@ -601,15 +590,18 @@ impl JMAP {
}
pub async fn get_used_quota(&self, account_id: u32) -> Result<i64, MethodError> {
self.store.get_quota(account_id).await.map_err(|err| {
tracing::error!(
self.store
.get_counter(NamedKey::Quota::<&[u8]>(account_id))
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "get_used_quota",
account_id = account_id,
error = ?err,
"Failed to obtain used disk quota for account.");
MethodError::ServerPartialFail
})
MethodError::ServerPartialFail
})
}
pub async fn filter(
@ -805,3 +797,67 @@ impl UpdateResults for QueryResponse {
}
}
}
pub enum NamedKey<T: AsRef<[u8]>> {
Name(T),
Id(u32),
Quota(u32),
}
impl<T: AsRef<[u8]>> From<&NamedKey<T>> for ValueClass {
fn from(key: &NamedKey<T>) -> Self {
match key {
NamedKey::Name(name) => ValueClass::Named(
KeySerializer::new(name.as_ref().len() + 1)
.write(0u8)
.write(name.as_ref())
.finalize(),
),
NamedKey::Id(id) => ValueClass::Named(
KeySerializer::new(std::mem::size_of::<u32>())
.write(1u8)
.write_leb128(*id)
.finalize(),
),
NamedKey::Quota(id) => ValueClass::Named(
KeySerializer::new(std::mem::size_of::<u32>())
.write(2u8)
.write_leb128(*id)
.finalize(),
),
}
}
}
impl<T: AsRef<[u8]>> From<NamedKey<T>> for ValueClass {
fn from(key: NamedKey<T>) -> Self {
(&key).into()
}
}
impl<T: AsRef<[u8]>> From<NamedKey<T>> for ValueKey<ValueClass> {
fn from(key: NamedKey<T>) -> Self {
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: key.into(),
}
}
}
impl<T: AsRef<[u8]> + Sync + Send> Key for NamedKey<T> {
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::from(self),
}
.serialize(include_subspace)
}
fn subspace(&self) -> u8 {
SUBSPACE_VALUES
}
}

View file

@ -28,7 +28,10 @@ use jmap_proto::{
object::Object,
types::{collection::Collection, property::Property, type_state::DataType, value::Value},
};
use store::{write::now, BitmapKey, StoreRead, ValueKey};
use store::{
write::{now, ValueClass},
BitmapKey, StoreRead, ValueKey,
};
use utils::map::bitmap::Bitmap;
use crate::{auth::AccessToken, services::state, JMAP};
@ -129,12 +132,12 @@ impl JMAP {
for document_id in document_ids {
let mut subscription = self
.store
.get_value::<Object<Value>>(ValueKey::new(
.get_value::<Object<Value>>(ValueKey {
account_id,
Collection::PushSubscription,
collection: Collection::PushSubscription.into(),
document_id,
Property::Value,
))
class: ValueClass::Property(Property::Value.into()),
})
.await?
.ok_or_else(|| {
store::Error::InternalError(format!(

View file

@ -50,7 +50,7 @@ use store::{
BlobKind, StoreWrite,
};
use crate::{auth::AccessToken, JMAP};
use crate::{auth::AccessToken, NamedKey, JMAP};
struct SetContext<'x> {
account_id: u32,
@ -68,7 +68,6 @@ pub static SCHEMA: &[IndexProperty] = &[
.max_size(255)
.required(),
IndexProperty::new(Property::IsActive).index_as(IndexAs::Integer),
IndexProperty::new(Property::Size).index_as(IndexAs::Quota),
];
impl JMAP {
@ -114,6 +113,10 @@ impl JMAP {
.with_account_id(account_id)
.with_collection(Collection::SieveScript)
.create_document(document_id)
.add(
NamedKey::Quota::<&[u8]>(account_id),
builder.changes().unwrap().script_size(),
)
.custom(builder);
sieve_ids.insert(document_id);
self.write_batch(batch).await?;
@ -163,19 +166,29 @@ impl JMAP {
)
.await?
{
let prev_size = sieve.inner.script_size();
match self
.sieve_set_item(object, (document_id, sieve).into(), &ctx)
.await?
{
Ok((builder, blob)) => {
// Store blob
let blob_id = if let Some(blob) = blob {
let (update_quota, blob_id) = if let Some(blob) = blob {
let blob_id =
BlobId::linked(account_id, Collection::SieveScript, document_id);
self.put_blob(&blob_id.kind, &blob).await?;
Some(blob_id.with_section_size(blob.len()))
let blob_size = builder.changes().unwrap().script_size();
(
match blob_size.cmp(&prev_size) {
std::cmp::Ordering::Greater => blob_size - prev_size,
std::cmp::Ordering::Less => -prev_size + blob_size,
std::cmp::Ordering::Equal => 0,
},
Some(blob_id.with_section_size(blob.len())),
)
} else {
None
(0, None)
};
// Write record
@ -185,6 +198,9 @@ impl JMAP {
.with_collection(Collection::SieveScript)
.update_document(document_id)
.custom(builder);
if update_quota != 0 {
batch.add(NamedKey::Quota::<&[u8]>(account_id), update_quota);
}
if !batch.is_empty() {
changes.log_update(Collection::SieveScript, document_id);
match self.store.write(batch.build()).await {
@ -333,6 +349,10 @@ impl JMAP {
.with_collection(Collection::SieveScript)
.delete_document(document_id)
.value(Property::EmailIds, (), F_VALUE | F_CLEAR)
.add(
NamedKey::Quota::<&[u8]>(account_id),
-(obj.inner.script_size()),
)
.custom(ObjectIndexBuilder::new(SCHEMA).with_current(obj));
self.write_batch(batch).await?;
let _ = self
@ -605,3 +625,16 @@ impl JMAP {
Ok(changed_ids)
}
}
pub trait ScriptSize {
fn script_size(&self) -> i64;
}
impl ScriptSize for Object<Value> {
fn script_size(&self) -> i64 {
self.properties
.get(&Property::Size)
.and_then(|v| v.as_uint())
.unwrap_or_default() as i64
}
}

View file

@ -45,7 +45,10 @@ use store::{
BlobKind,
};
use crate::{sieve::set::SCHEMA, JMAP};
use crate::{
sieve::set::{ScriptSize, SCHEMA},
NamedKey, JMAP,
};
impl JMAP {
pub async fn vacation_response_set(
@ -224,10 +227,25 @@ impl JMAP {
.with_changes(changes);
// Create sieve script only if there are changes
let script_blob = if build_script {
self.build_script(&mut obj)?.into()
let (update_quota, script_blob) = if build_script {
let script_blob = self.build_script(&mut obj)?;
let script_size = obj.changes().unwrap().script_size();
(
if let Some(current) = obj.current() {
let current_script_size = current.inner.script_size();
match script_size.cmp(&current_script_size) {
std::cmp::Ordering::Greater => script_size - current_script_size,
std::cmp::Ordering::Less => -current_script_size + script_size,
std::cmp::Ordering::Equal => 0,
}
} else {
script_size
},
Some(script_blob),
)
} else {
None
(0, None)
};
// Write changes
@ -247,6 +265,9 @@ impl JMAP {
document_id
};
if !batch.is_empty() {
if update_quota != 0 {
batch.add(NamedKey::Quota::<&[u8]>(account_id), update_quota);
}
self.write_batch(batch).await?;
}

View file

@ -30,15 +30,16 @@ use roaring::RoaringBitmap;
use crate::{
query::{self, Operator},
write::key::{DeserializeBigEndian, KeySerializer},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, StoreRead, SUBSPACE_INDEXES,
SUBSPACE_QUOTAS,
write::{
key::{DeserializeBigEndian, KeySerializer},
BitmapClass, ValueClass,
},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, StoreRead, ValueKey,
SUBSPACE_INDEXES, SUBSPACE_QUOTAS,
};
use super::{bitmap::DeserializeBlock, FdbStore};
//trx
#[async_trait::async_trait]
impl StoreRead for FdbStore {
async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
@ -55,9 +56,9 @@ impl StoreRead for FdbStore {
}
}
async fn get_bitmap<T: AsRef<[u8]> + Sync + Send>(
async fn get_bitmap(
&self,
mut key: BitmapKey<T>,
mut key: BitmapKey<BitmapClass>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
let begin = key.serialize(true);
@ -314,24 +315,14 @@ impl StoreRead for FdbStore {
Ok(None)
}
async fn get_quota(&self, account_id: u32) -> crate::Result<i64> {
if let Some(bytes) = self
.db
.create_trx()?
.get(
&KeySerializer::new(5)
.write(SUBSPACE_QUOTAS)
.write(account_id)
.finalize(),
true,
)
.await?
{
async fn get_counter(
&self,
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(true);
if let Some(bytes) = self.db.create_trx()?.get(&key, true).await? {
Ok(i64::from_le_bytes(bytes[..].try_into().map_err(|_| {
crate::Error::InternalError(format!(
"Invalid quota value for account {}",
account_id
))
crate::Error::InternalError("Invalid counter value.".to_string())
})?))
} else {
Ok(0)

View file

@ -27,9 +27,8 @@ use ahash::AHashMap;
use foundationdb::{options::MutationType, FdbError};
use crate::{
write::{key::KeySerializer, Batch, Operation, ValueClass},
AclKey, BitmapKey, IndexKey, Key, LogKey, StoreWrite, ValueKey, SUBSPACE_QUOTAS,
SUBSPACE_VALUES,
write::{Batch, Operation, ValueOp},
BitmapKey, IndexKey, Key, LogKey, StoreWrite, ValueKey,
};
use super::{bitmap::DenseBitmap, FdbStore};
@ -87,31 +86,30 @@ impl StoreWrite for FdbStore {
} => {
document_id = *document_id_;
}
Operation::Value { class, set } => {
let key = match class {
ValueClass::Property { field, family } => ValueKey {
account_id,
collection,
document_id,
family: *family,
field: *field,
}
.serialize(true),
ValueClass::Acl { grant_account_id } => AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize(true),
ValueClass::Custom { bytes } => {
let mut key = Vec::with_capacity(1 + bytes.len());
key.push(SUBSPACE_VALUES);
key.extend_from_slice(bytes);
key
}
};
if let Some(value) = set {
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(true);
trx.atomic_op(&key, &by.to_le_bytes()[..], MutationType::Add);
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(true);
if let ValueOp::Set(value) = op {
trx.set(&key, value);
} else {
trx.clear(&key);
@ -126,18 +124,14 @@ impl StoreWrite for FdbStore {
key,
}
.serialize(true);
if *set {
trx.set(&key, &[]);
} else {
trx.clear(&key);
}
}
Operation::Bitmap {
family,
field,
key,
set,
} => {
Operation::Bitmap { class, set } => {
if retry_count == 0 {
if *set {
&mut set_bitmaps
@ -148,10 +142,8 @@ impl StoreWrite for FdbStore {
BitmapKey {
account_id,
collection,
family: *family,
field: *field,
class,
block_num: DenseBitmap::block_num(document_id),
key,
}
.serialize(true),
)
@ -176,29 +168,13 @@ impl StoreWrite for FdbStore {
class,
assert_value,
} => {
let key = match class {
ValueClass::Property { field, family } => ValueKey {
account_id,
collection,
document_id,
family: *family,
field: *field,
}
.serialize(true),
ValueClass::Acl { grant_account_id } => AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize(true),
ValueClass::Custom { bytes } => {
let mut key = Vec::with_capacity(1 + bytes.len());
key.push(SUBSPACE_VALUES);
key.extend_from_slice(bytes);
key
}
};
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(true);
let matches = if let Ok(bytes) = trx.get(&key, false).await {
if let Some(bytes) = bytes {
@ -215,16 +191,6 @@ impl StoreWrite for FdbStore {
return Err(crate::Error::AssertValueFailed);
}
}
Operation::UpdateQuota { bytes } => {
trx.atomic_op(
&KeySerializer::new(5)
.write(SUBSPACE_QUOTAS)
.write(account_id)
.finalize(),
&bytes.to_le_bytes()[..],
MutationType::Add,
);
}
}
}
@ -257,19 +223,12 @@ impl StoreWrite for FdbStore {
} => {
document_id = *document_id_;
}
Operation::Bitmap {
family,
field,
key,
set,
} => {
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
family: *family,
field: *field,
class,
block_num: DenseBitmap::block_num(document_id),
key,
}
.serialize(true);
if *set {

View file

@ -27,3 +27,6 @@ pub mod foundationdb;
pub mod rocksdb;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub(crate) const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 2) as usize;
pub(crate) const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;

View file

@ -86,7 +86,7 @@ impl Store {
#[inline(always)]
pub fn get_bitmap<T: AsRef<[u8]>>(
&self,
key: BitmapKey<T>,
key: BitmapKey,
) -> crate::Result<Option<RoaringBitmap>> {
let key = key.serialize();
if let Some(bytes) = self

View file

@ -107,7 +107,7 @@ impl SqliteStore {
conn.execute(
"CREATE TABLE IF NOT EXISTS q (
k INTEGER PRIMARY KEY,
k BLOB PRIMARY KEY,
v INTEGER NOT NULL DEFAULT 0
)",
[],
@ -142,15 +142,17 @@ impl SqliteStore {
Ok(())
}
pub async fn spawn_worker<U, V>(&self, f: U) -> crate::Result<V>
pub async fn spawn_worker<U, V>(&self, mut f: U) -> crate::Result<V>
where
U: FnOnce() -> crate::Result<V> + Send + 'static,
U: FnMut() -> crate::Result<V> + Send,
V: Sync + Send + 'static,
{
let (tx, rx) = oneshot::channel();
self.worker_pool.spawn(move || {
tx.send(f()).ok();
self.worker_pool.scope(|s| {
s.spawn(|_| {
tx.send(f()).ok();
});
});
match rx.await {

View file

@ -26,8 +26,11 @@ use rusqlite::OptionalExtension;
use crate::{
query::{self, Operator},
write::key::{DeserializeBigEndian, KeySerializer},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, StoreRead,
write::{
key::{DeserializeBigEndian, KeySerializer},
BitmapClass, ValueClass,
},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, StoreRead, ValueKey,
};
use super::{SqliteStore, BITS_PER_BLOCK, WORDS_PER_BLOCK, WORD_SIZE_BITS};
@ -36,23 +39,26 @@ use super::{SqliteStore, BITS_PER_BLOCK, WORDS_PER_BLOCK, WORD_SIZE_BITS};
impl StoreRead for SqliteStore {
async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
where
U: Deserialize,
U: Deserialize + 'static,
{
let conn = self.conn_pool.get()?;
let key = key.serialize(false);
let mut result = conn.prepare_cached("SELECT v FROM v WHERE k = ?")?;
result
.query_row([&key], |row| {
U::deserialize(row.get_ref(0)?.as_bytes()?)
.map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into()))
})
.optional()
.map_err(Into::into)
self.spawn_worker(move || {
let key = key.serialize(false);
let mut result = conn.prepare_cached("SELECT v FROM v WHERE k = ?")?;
result
.query_row([&key], |row| {
U::deserialize(row.get_ref(0)?.as_bytes()?)
.map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into()))
})
.optional()
.map_err(Into::into)
})
.await
}
async fn get_bitmap<T: AsRef<[u8]> + Sync + Send>(
async fn get_bitmap(
&self,
mut key: BitmapKey<T>,
mut key: BitmapKey<BitmapClass>,
) -> crate::Result<Option<RoaringBitmap>> {
let begin = key.serialize(false);
key.block_num = u32::MAX;
@ -193,41 +199,45 @@ impl StoreRead for SqliteStore {
let field = field.into();
let conn = self.conn_pool.get()?;
let begin = IndexKeyPrefix {
account_id,
collection,
field,
}
.serialize(false);
let end = IndexKeyPrefix {
account_id,
collection,
field: field + 1,
}
.serialize(false);
let prefix_len = begin.len();
let mut query = conn.prepare_cached(if ascending {
"SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC"
} else {
"SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC"
})?;
let mut rows = query.query([&begin, &end])?;
while let Some(row) = rows.next()? {
let key = row.get_ref(0)?.as_bytes()?;
let id_pos = key.len() - std::mem::size_of::<u32>();
debug_assert!(key.starts_with(&begin));
if !cb(
key.get(prefix_len..id_pos).ok_or_else(|| {
crate::Error::InternalError("Invalid key found in index".to_string())
})?,
key.deserialize_be_u32(id_pos)?,
)? {
return Ok(());
self.spawn_worker(move || {
let begin = IndexKeyPrefix {
account_id,
collection,
field,
}
}
.serialize(false);
let end = IndexKeyPrefix {
account_id,
collection,
field: field + 1,
}
.serialize(false);
let prefix_len = begin.len();
let mut query = conn.prepare_cached(if ascending {
"SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k ASC"
} else {
"SELECT k FROM i WHERE k >= ? AND k < ? ORDER BY k DESC"
})?;
let mut rows = query.query([&begin, &end])?;
Ok(())
while let Some(row) = rows.next()? {
let key = row.get_ref(0)?.as_bytes()?;
let id_pos = key.len() - std::mem::size_of::<u32>();
debug_assert!(key.starts_with(&begin));
if !cb(
key.get(prefix_len..id_pos).ok_or_else(|| {
crate::Error::InternalError("Invalid key found in index".to_string())
})?,
key.deserialize_be_u32(id_pos)?,
)? {
return Ok(());
}
}
Ok(())
})
.await
}
async fn iterate(
@ -239,36 +249,44 @@ impl StoreRead for SqliteStore {
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
let table = char::from(begin.subspace());
let begin = begin.serialize(false);
let end = end.serialize(false);
let mut query = conn.prepare_cached(&match (first, ascending) {
(true, true) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC LIMIT 1")
}
(true, false) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC LIMIT 1")
}
(false, true) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC")
}
(false, false) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC")
}
})?;
let mut rows = query.query([&begin, &end])?;
self.spawn_worker(move || {
let table = char::from(begin.subspace());
let begin = begin.serialize(false);
let end = end.serialize(false);
while let Some(row) = rows.next()? {
let key = row.get_ref(0)?.as_bytes()?;
let value = row.get_ref(1)?.as_bytes()?;
let mut query = conn.prepare_cached(&match (first, ascending) {
(true, true) => {
format!(
"SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC LIMIT 1"
)
}
(true, false) => {
format!(
"SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC LIMIT 1"
)
}
(false, true) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k ASC")
}
(false, false) => {
format!("SELECT k, v FROM {table} WHERE k >= ? AND k <= ? ORDER BY k DESC")
}
})?;
let mut rows = query.query([&begin, &end])?;
if !cb(key, value)? {
break;
while let Some(row) = rows.next()? {
let key = row.get_ref(0)?.as_bytes()?;
let value = row.get_ref(1)?.as_bytes()?;
if !cb(key, value)? {
break;
}
}
}
Ok(())
Ok(())
})
.await
}
async fn get_last_change_id(
@ -278,33 +296,45 @@ impl StoreRead for SqliteStore {
) -> crate::Result<Option<u64>> {
let conn = self.conn_pool.get()?;
let collection = collection.into();
let key = LogKey {
account_id,
collection,
change_id: u64::MAX,
}
.serialize(false);
let mut results =
conn.prepare_cached("SELECT k FROM l WHERE k < ? ORDER BY k DESC LIMIT 1")?;
results
.query_row([&key], |row| {
let key = row.get_ref(0)?.as_bytes()?;
key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())
.map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into()))
})
.optional()
.map_err(Into::into)
self.spawn_worker(move || {
let key = LogKey {
account_id,
collection,
change_id: u64::MAX,
}
.serialize(false);
conn.prepare_cached("SELECT k FROM l WHERE k < ? ORDER BY k DESC LIMIT 1")?
.query_row([&key], |row| {
let key = row.get_ref(0)?.as_bytes()?;
key.deserialize_be_u64(key.len() - std::mem::size_of::<u64>())
.map_err(|err| rusqlite::Error::ToSqlConversionFailure(err.into()))
})
.optional()
.map_err(Into::into)
})
.await
}
async fn get_quota(&self, account_id: u32) -> crate::Result<i64> {
async fn get_counter(
&self,
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(false);
let conn = self.conn_pool.get()?;
let mut results = conn.prepare_cached("SELECT v FROM q WHERE k = ?")?;
match results.query_row([account_id as i64], |row| row.get::<_, i64>(0)) {
Ok(value) => Ok(value),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
self.spawn_worker(move || {
match conn
.prepare_cached("SELECT v FROM q WHERE k = ?")?
.query_row([&key], |row| row.get::<_, i64>(0))
{
Ok(value) => Ok(value),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
})
.await
}
#[cfg(feature = "test_mode")]
@ -377,11 +407,11 @@ impl StoreRead for SqliteStore {
let mut rows = query.query([]).unwrap();
while let Some(row) = rows.next().unwrap() {
let key = row.get::<_, i64>(0).unwrap();
let key = row.get_ref(0).unwrap().as_bytes().unwrap();
let value = row.get::<_, i64>(1).unwrap();
if value != 0 {
eprintln!(
"Table quota is not empty, account {}, quota: {}",
"Table counter is not empty, account {:?}, quota: {}",
key, value,
);
has_errors = true;

View file

@ -24,8 +24,8 @@
use rusqlite::{params, OptionalExtension, TransactionBehavior};
use crate::{
write::{Batch, Operation, ValueClass},
AclKey, BitmapKey, IndexKey, Key, LogKey, StoreWrite, ValueKey,
write::{Batch, Operation, ValueOp},
BitmapKey, IndexKey, Key, LogKey, StoreWrite, ValueKey,
};
use super::{SqliteStore, BITS_MASK, BITS_PER_BLOCK};
@ -121,27 +121,39 @@ impl StoreWrite for SqliteStore {
bitmap_value_set = (1u64 << (index as u64 & 63)) as i64;
bitmap_value_clear = (!(1u64 << (index as u64 & 63))) as i64;
}
Operation::Value { class, set } => {
let key = match class {
ValueClass::Property { field, family } => ValueKey {
account_id,
collection,
document_id,
family: *family,
field: *field,
}
.serialize(false),
ValueClass::Acl { grant_account_id } => AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize(false),
ValueClass::Custom { bytes } => bytes.to_vec(),
};
Operation::Value {
class,
op: ValueOp::Add(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
if let Some(value) = set {
if *by >= 0 {
trx.prepare_cached(concat!(
"INSERT INTO q (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + excluded.v"
))?
.execute(params![&key, *by])?;
} else {
trx.prepare_cached("UPDATE q SET v = v + ? WHERE k = ?")?
.execute(params![*by, &key])?;
}
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
if let ValueOp::Set(value) = op {
trx.prepare_cached("INSERT OR REPLACE INTO v (k, v) VALUES (?, ?)")?
.execute([&key, value])?;
} else {
@ -167,25 +179,16 @@ impl StoreWrite for SqliteStore {
.execute([&key])?;
}
}
Operation::Bitmap {
family,
field,
key,
set,
} => {
Operation::Bitmap { class, set } => {
let key = BitmapKey {
account_id,
collection,
family: *family,
field: *field,
class,
block_num: bitmap_block_num,
key,
}
.serialize(false);
if *set {
//trx.prepare_cached("INSERT OR IGNORE INTO b (z) VALUES (?)")?
// .execute([&key])?;
trx.prepare_cached(SET_QUERIES[bitmap_col_num])?
.execute(params![bitmap_value_set, &key])?;
if trx.changes() == 0 {
@ -217,24 +220,14 @@ impl StoreWrite for SqliteStore {
class,
assert_value,
} => {
let key = match class {
ValueClass::Property { field, family } => ValueKey {
account_id,
collection,
document_id,
family: *family,
field: *field,
}
.serialize(false),
ValueClass::Acl { grant_account_id } => AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize(false),
ValueClass::Custom { bytes } => bytes.to_vec(),
};
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(false);
let matches = trx
.prepare_cached("SELECT v FROM v WHERE k = ?")?
.query_row([&key], |row| {
@ -246,18 +239,6 @@ impl StoreWrite for SqliteStore {
return Err(crate::Error::AssertValueFailed);
}
}
Operation::UpdateQuota { bytes } => {
if *bytes >= 0 {
trx.prepare_cached(concat!(
"INSERT INTO q (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + excluded.v"
))?
.execute(params![account_id, *bytes])?;
} else {
trx.prepare_cached("UPDATE q SET v = v + ? WHERE k = ?")?
.execute(params![*bytes, account_id])?;
}
}
}
}

View file

@ -25,7 +25,7 @@ use std::{fmt::Display, ops::BitAndAssign};
pub mod backend;
pub mod blob;
pub mod fts;
//pub mod fts;
pub mod query;
pub mod write;
@ -36,7 +36,7 @@ use query::{filter::StoreQuery, log::StoreLog, sort::StoreSort};
pub use rand;
pub use roaring;
use roaring::RoaringBitmap;
use write::Batch;
use write::{Batch, BitmapClass, ValueClass};
#[cfg(feature = "rocks")]
pub struct Store {
@ -57,16 +57,14 @@ pub trait Key: Sync + Send {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BitmapKey<T: AsRef<[u8]>> {
pub struct BitmapKey<T: AsRef<BitmapClass>> {
pub account_id: u32,
pub collection: u8,
pub family: u8,
pub field: u8,
pub class: T,
pub block_num: u32,
pub key: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IndexKey<T: AsRef<[u8]>> {
pub account_id: u32,
pub collection: u8,
@ -83,25 +81,11 @@ pub struct IndexKeyPrefix {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ValueKey {
pub struct ValueKey<T: AsRef<ValueClass>> {
pub account_id: u32,
pub collection: u8,
pub document_id: u32,
pub family: u8,
pub field: u8,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CustomValueKey {
pub value: Vec<u8>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AclKey {
pub grant_account_id: u32,
pub to_account_id: u32,
pub to_collection: u8,
pub to_document_id: u32,
pub class: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -169,20 +153,6 @@ impl From<String> for Error {
}
}
pub const BM_DOCUMENT_IDS: u8 = 0;
pub const BM_TAG: u8 = 1 << 5;
pub const BM_HASH: u8 = 1 << 6;
pub const HASH_EXACT: u8 = 0;
pub const HASH_STEMMED: u8 = 1 << 6;
pub const BLOOM_BIGRAM: u8 = 1 << 0;
pub const BLOOM_TRIGRAM: u8 = 1 << 1;
pub const TAG_ID: u8 = 0;
pub const TAG_TEXT: u8 = 1 << 0;
pub const TAG_STATIC: u8 = 1 << 1;
pub const SUBSPACE_BITMAPS: u8 = b'b';
pub const SUBSPACE_VALUES: u8 = b'v';
pub const SUBSPACE_LOGS: u8 = b'l';
@ -229,14 +199,12 @@ pub trait StoreRead: Sync {
Ok(results)
}
async fn get_bitmap<T: AsRef<[u8]> + Sync + Send>(
&self,
key: BitmapKey<T>,
) -> crate::Result<Option<RoaringBitmap>>;
async fn get_bitmap(&self, key: BitmapKey<BitmapClass>)
-> crate::Result<Option<RoaringBitmap>>;
async fn get_bitmaps_intersection<T: AsRef<[u8]> + Sync + Send>(
async fn get_bitmaps_intersection(
&self,
keys: Vec<BitmapKey<T>>,
keys: Vec<BitmapKey<BitmapClass>>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for key in keys {
@ -289,7 +257,10 @@ pub trait StoreRead: Sync {
collection: impl Into<u8> + Sync + Send,
) -> crate::Result<Option<u64>>;
async fn get_quota(&self, account_id: u32) -> crate::Result<i64>;
async fn get_counter(
&self,
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
) -> crate::Result<i64>;
#[cfg(feature = "test_mode")]
async fn assert_is_empty(&self);
@ -308,6 +279,16 @@ pub trait StoreWrite {
}
pub trait Store:
StoreInit + StoreRead + StoreWrite + StoreId + StorePurge + StoreQuery + StoreSort + StoreLog
StoreInit
+ StoreRead
+ StoreWrite
+ StoreId
+ StorePurge
+ StoreQuery
+ StoreSort
+ StoreLog
+ Sync
+ Send
+ 'static
{
}

View file

@ -27,7 +27,7 @@ use ahash::HashSet;
use nlp::tokenizers::space::SpaceTokenizer;
use roaring::RoaringBitmap;
use crate::{fts::builder::MAX_TOKEN_LENGTH, BitmapKey, StoreRead};
use crate::{backend::MAX_TOKEN_LENGTH, BitmapKey, StoreRead};
use super::{Filter, ResultSet};
@ -79,23 +79,21 @@ pub trait StoreQuery: StoreRead {
.collect::<HashSet<String>>()
.into_iter()
.map(|word| {
BitmapKey::hash(&word, account_id, collection, 0, field)
BitmapKey::text_token(account_id, collection, field, word)
})
.collect(),
)
.await?
} else {
self.get_bitmap(BitmapKey::hash(&text, account_id, collection, 0, field))
self.get_bitmap(BitmapKey::text_token(account_id, collection, field, text))
.await?
}
}
Filter::InBitmap { family, field, key } => {
Filter::InBitmap(class) => {
self.get_bitmap(BitmapKey {
account_id,
collection,
family,
field,
key: &key,
class,
block_num: 0,
})
.await?

View file

@ -27,7 +27,10 @@ pub mod sort;
use roaring::RoaringBitmap;
use crate::{write::BitmapFamily, BitmapKey, Deserialize, Serialize, BM_DOCUMENT_IDS};
use crate::{
write::{BitmapClass, TagValue},
BitmapKey, Serialize,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Operator {
@ -50,11 +53,7 @@ pub enum Filter {
text: String,
tokenize: bool,
},
InBitmap {
family: u8,
field: u8,
key: Vec<u8>,
},
InBitmap(BitmapClass),
DocumentSet(RoaringBitmap),
And,
Or,
@ -202,12 +201,11 @@ impl Filter {
}
}
pub fn is_in_bitmap(field: impl Into<u8>, value: impl BitmapFamily + Serialize) -> Self {
Self::InBitmap {
family: value.family(),
pub fn is_in_bitmap(field: impl Into<u8>, value: impl Into<TagValue>) -> Self {
Self::InBitmap(BitmapClass::Tag {
field: field.into(),
key: value.serialize(),
}
value: value.into(),
})
}
pub fn is_in_set(set: RoaringBitmap) -> Self {
@ -242,19 +240,52 @@ impl Comparator {
}
}
impl BitmapKey<&'static [u8]> {
impl BitmapKey<BitmapClass> {
pub fn document_ids(account_id: u32, collection: impl Into<u8>) -> Self {
BitmapKey {
account_id,
collection: collection.into(),
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
class: BitmapClass::DocumentIds,
block_num: 0,
}
}
pub fn text_token(
account_id: u32,
collection: impl Into<u8>,
field: impl Into<u8>,
token: impl Into<Vec<u8>>,
) -> Self {
BitmapKey {
account_id,
collection: collection.into(),
class: BitmapClass::Text {
field: field.into(),
token: token.into(),
},
block_num: 0,
}
}
pub fn tag(
account_id: u32,
collection: impl Into<u8>,
field: impl Into<u8>,
value: impl Into<TagValue>,
) -> Self {
BitmapKey {
account_id,
collection: collection.into(),
class: BitmapClass::Tag {
field: field.into(),
value: value.into(),
},
block_num: 0,
}
}
}
/*
#[derive(Debug)]
pub struct RawValue<T: Deserialize> {
pub raw: Vec<u8>,
@ -269,3 +300,4 @@ impl<T: Deserialize> Deserialize for RawValue<T> {
})
}
}
*/

View file

@ -25,7 +25,7 @@ use std::cmp::Ordering;
use ahash::{AHashMap, AHashSet};
use crate::{StoreRead, ValueKey};
use crate::{write::ValueClass, StoreRead, ValueKey};
use super::{Comparator, ResultSet, SortedResultSet};
@ -38,7 +38,7 @@ pub struct Pagination {
has_anchor: bool,
anchor_found: bool,
ids: Vec<u64>,
prefix_key: Option<ValueKey>,
prefix_key: Option<ValueKey<ValueClass>>,
prefix_unique: bool,
}
@ -111,7 +111,7 @@ pub trait StoreSort: StoreRead {
if let Some(prefix_key) = prefix_key {
for id in sorted_results.ids.iter_mut() {
if let Some(prefix_id) = self
.get_value::<u32>(prefix_key.with_document_id(*id as u32))
.get_value::<u32>(prefix_key.clone().with_document_id(*id as u32))
.await?
{
*id |= (prefix_id as u64) << 32;
@ -198,7 +198,7 @@ pub trait StoreSort: StoreRead {
// Obtain document prefixId
let prefix_id = if let Some(prefix_key) = &paginate.prefix_key {
if let Some(prefix_id) = self
.get_value(prefix_key.with_document_id(document_id))
.get_value(prefix_key.clone().with_document_id(document_id))
.await?
{
if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) {
@ -226,7 +226,7 @@ pub trait StoreSort: StoreRead {
// Obtain document prefixId
let prefix_id = if let Some(prefix_key) = &paginate.prefix_key {
if let Some(prefix_id) = self
.get_value(prefix_key.with_document_id(document_id))
.get_value(prefix_key.clone().with_document_id(document_id))
.await?
{
if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) {
@ -269,7 +269,7 @@ impl Pagination {
}
}
pub fn with_prefix_key(mut self, prefix_key: ValueKey) -> Self {
pub fn with_prefix_key(mut self, prefix_key: ValueKey<ValueClass>) -> Self {
self.prefix_key = Some(prefix_key);
self
}

View file

@ -21,11 +21,9 @@
* for more details.
*/
use crate::BM_DOCUMENT_IDS;
use super::{
assert::ToAssertValue, Batch, BatchBuilder, BitmapFamily, HasFlag, IntoOperations, Operation,
Serialize, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE,
assert::ToAssertValue, Batch, BatchBuilder, BitmapClass, HasFlag, IntoOperations, Operation,
Serialize, TagValue, ToBitmaps, ValueClass, ValueOp, F_BITMAP, F_CLEAR, F_INDEX, F_VALUE,
};
impl BatchBuilder {
@ -57,9 +55,7 @@ impl BatchBuilder {
// Add document id
self.ops.push(Operation::Bitmap {
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: vec![],
class: BitmapClass::DocumentIds,
set: true,
});
self
@ -73,9 +69,7 @@ impl BatchBuilder {
pub fn delete_document(&mut self, document_id: u32) -> &mut Self {
self.ops.push(Operation::DocumentId { document_id });
self.ops.push(Operation::Bitmap {
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: vec![],
class: BitmapClass::DocumentIds,
set: false,
});
self
@ -118,36 +112,55 @@ impl BatchBuilder {
if options.has_flag(F_VALUE) {
self.ops.push(Operation::Value {
class: ValueClass::Property { field, family: 0 },
set: if is_set { Some(value) } else { None },
class: ValueClass::Property(field),
op: if is_set {
ValueOp::Set(value)
} else {
ValueOp::Clear
},
});
}
self
}
pub fn bitmap(
pub fn tag(
&mut self,
field: impl Into<u8>,
value: impl BitmapFamily + Serialize,
value: impl Into<TagValue>,
options: u32,
) -> &mut Self {
self.ops.push(Operation::Bitmap {
family: value.family(),
field: field.into(),
key: value.serialize(),
class: BitmapClass::Tag {
field: field.into(),
value: value.into(),
},
set: !options.has_flag(F_CLEAR),
});
self
}
pub fn quota(&mut self, bytes: i64) -> &mut Self {
self.ops.push(Operation::UpdateQuota { bytes });
pub fn add(&mut self, class: impl Into<ValueClass>, value: i64) -> &mut Self {
self.ops.push(Operation::Value {
class: class.into(),
op: ValueOp::Add(value),
});
self
}
pub fn op(&mut self, op: Operation) -> &mut Self {
self.ops.push(op);
pub fn set(&mut self, class: impl Into<ValueClass>, value: Vec<u8>) -> &mut Self {
self.ops.push(Operation::Value {
class: class.into(),
op: ValueOp::Set(value),
});
self
}
pub fn clear(&mut self, class: impl Into<ValueClass>) -> &mut Self {
self.ops.push(Operation::Value {
class: class.into(),
op: ValueOp::Clear,
});
self
}
@ -166,6 +179,13 @@ impl BatchBuilder {
}
}
pub fn last_account_id(&self) -> Option<u32> {
self.ops.iter().rev().find_map(|op| match op {
Operation::AccountId { account_id } => Some(*account_id),
_ => None,
})
}
pub fn is_empty(&self) -> bool {
self.ops.is_empty()
|| !self.ops.iter().any(|op| {

View file

@ -21,14 +21,16 @@
* for more details.
*/
use std::convert::TryInto;
use std::{convert::TryInto, hash::Hasher};
use utils::codec::leb128::Leb128_;
use crate::{
AclKey, BitmapKey, CustomValueKey, Deserialize, Error, IndexKey, IndexKeyPrefix, Key, LogKey,
backend::MAX_TOKEN_MASK, BitmapKey, Deserialize, Error, IndexKey, IndexKeyPrefix, Key, LogKey,
ValueKey, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};
use super::{BitmapClass, TagValue, ValueClass};
pub struct KeySerializer {
buf: Vec<u8>,
}
@ -142,29 +144,18 @@ impl DeserializeBigEndian for &[u8] {
}
}
impl ValueKey {
pub fn new(
impl<T: AsRef<ValueClass>> ValueKey<T> {
pub fn property(
account_id: u32,
collection: impl Into<u8>,
document_id: u32,
field: impl Into<u8>,
) -> Self {
) -> ValueKey<ValueClass> {
ValueKey {
account_id,
collection: collection.into(),
document_id,
family: 0,
field: field.into(),
}
}
pub fn term_index(account_id: u32, collection: impl Into<u8>, document_id: u32) -> Self {
ValueKey {
account_id,
collection: collection.into(),
document_id,
family: u8::MAX,
field: u8::MAX,
class: ValueClass::Property(field.into()),
}
}
@ -193,19 +184,6 @@ impl IndexKeyPrefix {
}
}
impl Deserialize for AclKey {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
Ok(AclKey {
grant_account_id: bytes.deserialize_be_u32(0)?,
to_account_id: bytes.deserialize_be_u32(std::mem::size_of::<u32>() + 1)?,
to_collection: *bytes
.get((std::mem::size_of::<u32>() * 2) + 1)
.ok_or_else(|| Error::InternalError(format!("Corrupted acl key {bytes:?}")))?,
to_document_id: bytes.deserialize_be_u32((std::mem::size_of::<u32>() * 2) + 2)?,
})
}
}
impl Key for LogKey {
fn subspace(&self) -> u8 {
SUBSPACE_LOGS
@ -226,7 +204,7 @@ impl Key for LogKey {
}
}
impl Key for ValueKey {
impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
fn subspace(&self) -> u8 {
SUBSPACE_VALUES
}
@ -234,64 +212,26 @@ impl Key for ValueKey {
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
let ks = {
if include_subspace {
KeySerializer::new(std::mem::size_of::<ValueKey>() + 2)
.write(crate::SUBSPACE_VALUES)
KeySerializer::new(self.len() + 2).write(crate::SUBSPACE_VALUES)
} else {
KeySerializer::new(std::mem::size_of::<ValueKey>() + 1)
KeySerializer::new(self.len() + 1)
}
};
match self.class.as_ref() {
ValueClass::Property(field) => ks
.write(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.write(*field),
ValueClass::Acl(grant_account_id) => ks
.write(*grant_account_id)
.write(u8::MAX)
.write(self.account_id)
.write(self.collection)
.write(self.document_id),
ValueClass::Named(name) => ks.write(u32::MAX).write(name.as_slice()),
}
.write(self.account_id)
.write(self.collection)
.write_leb128(self.document_id);
if self.family == 0 {
ks.write(self.field).finalize()
} else {
ks.write(u8::MAX)
.write(self.family)
.write(self.field)
.finalize()
}
}
}
impl Key for CustomValueKey {
fn subspace(&self) -> u8 {
SUBSPACE_VALUES
}
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
{
if include_subspace {
KeySerializer::new(std::mem::size_of::<ValueKey>() + 2)
.write(crate::SUBSPACE_VALUES)
} else {
KeySerializer::new(std::mem::size_of::<ValueKey>() + 1)
}
}
.write(&self.value[..])
.finalize()
}
}
impl Key for AclKey {
fn subspace(&self) -> u8 {
SUBSPACE_VALUES
}
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
{
if include_subspace {
KeySerializer::new(std::mem::size_of::<AclKey>() + 1).write(crate::SUBSPACE_VALUES)
} else {
KeySerializer::new(std::mem::size_of::<AclKey>())
}
}
.write(self.grant_account_id)
.write(u8::MAX)
.write(self.to_account_id)
.write(self.to_collection)
.write(self.to_document_id)
.finalize()
}
}
@ -320,27 +260,142 @@ impl<T: AsRef<[u8]> + Sync + Send> Key for IndexKey<T> {
}
}
impl<T: AsRef<[u8]> + Sync + Send> Key for BitmapKey<T> {
impl<T: AsRef<BitmapClass> + Sync + Send> Key for BitmapKey<T> {
fn subspace(&self) -> u8 {
SUBSPACE_BITMAPS
}
fn serialize(&self, include_subspace: bool) -> Vec<u8> {
let key = self.key.as_ref();
{
if include_subspace {
KeySerializer::new(std::mem::size_of::<BitmapKey<T>>() + key.len() + 1)
.write(crate::SUBSPACE_BITMAPS)
} else {
KeySerializer::new(std::mem::size_of::<BitmapKey<T>>() + key.len())
}
const BM_DOCUMENT_IDS: u8 = 0;
const BM_TAG: u8 = 1 << 5;
const BM_TEXT: u8 = 1 << 6;
const TAG_ID: u8 = 0;
const TAG_TEXT: u8 = 1 << 0;
const TAG_STATIC: u8 = 1 << 1;
let ks = if include_subspace {
KeySerializer::new(self.len() + 1).write(crate::SUBSPACE_BITMAPS)
} else {
KeySerializer::new(self.len())
}
.write(self.account_id)
.write(self.collection)
.write(self.family)
.write(self.field)
.write(key)
.write(self.collection);
match self.class.as_ref() {
BitmapClass::DocumentIds => ks.write(BM_DOCUMENT_IDS),
BitmapClass::Tag { field, value } => match value {
TagValue::Id(id) => ks.write(BM_TAG | TAG_ID).write(*field).write_leb128(*id),
TagValue::Text(text) => ks
.write(BM_TAG | TAG_TEXT)
.write(*field)
.write(text.as_slice()),
TagValue::Static(id) => ks.write(BM_TAG | TAG_STATIC).write(*field).write(*id),
},
BitmapClass::Text { field, token } => ks
.write(BM_TEXT | (token.len() & MAX_TOKEN_MASK) as u8)
.write(*field)
.hash_text(token),
}
.write(self.block_num)
.finalize()
}
}
const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
0xaf1f2242106c64b3,
0x60ca4cfb4b3ed0ce,
0xc7dbc0bb615e82b3,
0x520ad065378daf88,
);
lazy_static::lazy_static! {
static ref SIPHASHER: siphasher::sip::SipHasher13 =
siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
}
impl KeySerializer {
fn hash_text(mut self, item: impl AsRef<[u8]>) -> Self {
let item = item.as_ref();
if item.len() <= 8 {
self.buf.extend_from_slice(item);
} else {
let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
let h2 = farmhash::hash64(item).to_le_bytes();
let h3 = AHASHER.hash_one(item).to_le_bytes();
let mut sh = *SIPHASHER;
sh.write(item.as_ref());
let h4 = sh.finish().to_le_bytes();
match item.len() {
9..=16 => {
self.buf.extend_from_slice(&h1[..2]);
self.buf.extend_from_slice(&h2[..2]);
self.buf.extend_from_slice(&h3[..2]);
self.buf.extend_from_slice(&h4[..2]);
}
17..=32 => {
self.buf.extend_from_slice(&h1[..3]);
self.buf.extend_from_slice(&h2[..3]);
self.buf.extend_from_slice(&h3[..3]);
self.buf.extend_from_slice(&h4[..3]);
}
_ => {
self.buf.extend_from_slice(&h1[..4]);
self.buf.extend_from_slice(&h2[..4]);
self.buf.extend_from_slice(&h3[..4]);
self.buf.extend_from_slice(&h4[..4]);
}
}
}
self
}
}
impl<T: AsRef<BitmapClass>> BitmapKey<T> {
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
std::mem::size_of::<BitmapKey<BitmapClass>>()
+ match self.class.as_ref() {
BitmapClass::DocumentIds => 0,
BitmapClass::Tag { value, .. } => match value {
TagValue::Id(_) => std::mem::size_of::<u32>(),
TagValue::Text(v) => v.len(),
TagValue::Static(_) => 1,
},
BitmapClass::Text { token, .. } => token.len(),
}
}
}
impl<T: AsRef<ValueClass>> ValueKey<T> {
#[allow(clippy::len_without_is_empty)]
pub fn len(&self) -> usize {
std::mem::size_of::<ValueKey<ValueClass>>()
+ match self.class.as_ref() {
ValueClass::Property(_) => 1,
ValueClass::Acl(_) => std::mem::size_of::<u32>(),
ValueClass::Named(v) => v.len(),
}
}
}
pub struct AclKey {
pub to_account_id: u32,
pub to_collection: u8,
//pub grant_account_id: u32,
//pub to_document_id: u32,
}
impl Deserialize for AclKey {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
Ok(AclKey {
to_account_id: bytes.deserialize_be_u32(std::mem::size_of::<u32>() + 1)?,
to_collection: *bytes
.get((std::mem::size_of::<u32>() * 2) + 1)
.ok_or_else(|| Error::InternalError(format!("Corrupted acl key {bytes:?}")))?,
//grant_account_id: bytes.deserialize_be_u32(0)?,
//to_document_id: bytes.deserialize_be_u32((std::mem::size_of::<u32>() * 2) + 2)?,
})
}
}

View file

@ -26,9 +26,7 @@ use std::{collections::HashSet, slice::Iter, time::SystemTime};
use nlp::tokenizers::space::SpaceTokenizer;
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::{
fts::builder::MAX_TOKEN_LENGTH, Deserialize, Serialize, BM_TAG, HASH_EXACT, TAG_ID, TAG_STATIC,
};
use crate::{backend::MAX_TOKEN_LENGTH, Deserialize, Serialize};
use self::assert::AssertValue;
@ -68,7 +66,7 @@ pub enum Operation {
},
Value {
class: ValueClass,
set: Option<Vec<u8>>,
op: ValueOp,
},
Index {
field: u8,
@ -76,14 +74,9 @@ pub enum Operation {
set: bool,
},
Bitmap {
family: u8,
field: u8,
key: Vec<u8>,
class: BitmapClass,
set: bool,
},
UpdateQuota {
bytes: i64,
},
Log {
change_id: u64,
collection: u8,
@ -92,10 +85,62 @@ pub enum Operation {
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum BitmapClass {
DocumentIds,
Tag { field: u8, value: TagValue },
Text { field: u8, token: Vec<u8> },
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum TagValue {
Id(u32),
Text(Vec<u8>),
Static(u8),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum ValueClass {
Property { field: u8, family: u8 },
Acl { grant_account_id: u32 },
Custom { bytes: Vec<u8> },
Property(u8),
Acl(u32),
Named(Vec<u8>),
}
#[derive(Debug, PartialEq, Eq, Hash, Default)]
pub enum ValueOp {
Set(Vec<u8>),
Add(i64),
#[default]
Clear,
}
impl From<u32> for TagValue {
fn from(value: u32) -> Self {
TagValue::Id(value)
}
}
impl From<Vec<u8>> for TagValue {
fn from(value: Vec<u8>) -> Self {
TagValue::Text(value)
}
}
impl From<String> for TagValue {
fn from(value: String) -> Self {
TagValue::Text(value.into_bytes())
}
}
impl From<u8> for TagValue {
fn from(value: u8) -> Self {
TagValue::Static(value)
}
}
impl From<()> for TagValue {
fn from(_: ()) -> Self {
TagValue::Text(vec![])
}
}
impl Serialize for u32 {
@ -277,17 +322,40 @@ pub trait ToBitmaps {
fn to_bitmaps(&self, ops: &mut Vec<Operation>, field: u8, set: bool);
}
pub trait TokenizeText {
fn tokenize_into(&self, tokens: &mut HashSet<String>);
fn to_tokens(&self) -> HashSet<String>;
}
impl ToBitmaps for &str {
fn to_bitmaps(&self, ops: &mut Vec<Operation>, field: u8, set: bool) {
let mut tokens = HashSet::new();
self.tokenize_into(&mut tokens);
for token in tokens {
ops.push(Operation::Bitmap {
class: BitmapClass::Text {
field,
token: token.into_bytes(),
},
set,
});
}
}
}
impl TokenizeText for &str {
fn tokenize_into(&self, tokens: &mut HashSet<String>) {
for token in SpaceTokenizer::new(self, MAX_TOKEN_LENGTH) {
tokens.insert(token);
}
}
for token in tokens {
ops.push(Operation::hash(&token, HASH_EXACT, field, set));
}
fn to_tokens(&self) -> HashSet<String> {
let mut tokens = HashSet::new();
self.tokenize_into(&mut tokens);
tokens
}
}
@ -300,9 +368,10 @@ impl ToBitmaps for String {
impl ToBitmaps for u32 {
fn to_bitmaps(&self, ops: &mut Vec<Operation>, field: u8, set: bool) {
ops.push(Operation::Bitmap {
family: BM_TAG | TAG_ID,
field,
key: self.serialize(),
class: BitmapClass::Tag {
field,
value: TagValue::Id(*self),
},
set,
});
}
@ -311,9 +380,10 @@ impl ToBitmaps for u32 {
impl ToBitmaps for u64 {
fn to_bitmaps(&self, ops: &mut Vec<Operation>, field: u8, set: bool) {
ops.push(Operation::Bitmap {
family: BM_TAG | TAG_ID,
field,
key: (*self as u32).serialize(),
class: BitmapClass::Tag {
field,
value: TagValue::Id(*self as u32),
},
set,
});
}
@ -333,22 +403,6 @@ impl<T: ToBitmaps> ToBitmaps for Vec<T> {
}
}
pub trait BitmapFamily {
fn family(&self) -> u8;
}
impl BitmapFamily for () {
fn family(&self) -> u8 {
BM_TAG | TAG_STATIC
}
}
impl BitmapFamily for u32 {
fn family(&self) -> u8 {
BM_TAG | TAG_ID
}
}
impl Serialize for () {
fn serialize(self) -> Vec<u8> {
Vec::with_capacity(0)
@ -368,8 +422,8 @@ pub trait IntoOperations {
impl Operation {
pub fn acl(grant_account_id: u32, set: Option<Vec<u8>>) -> Self {
Operation::Value {
class: ValueClass::Acl { grant_account_id },
set,
class: ValueClass::Acl(grant_account_id),
op: set.map(ValueOp::Set).unwrap_or(ValueOp::Clear),
}
}
}
@ -380,3 +434,24 @@ pub fn now() -> u64 {
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs())
}
impl AsRef<ValueClass> for ValueClass {
fn as_ref(&self) -> &ValueClass {
self
}
}
impl AsRef<BitmapClass> for BitmapClass {
fn as_ref(&self) -> &BitmapClass {
self
}
}
impl BitmapClass {
pub fn tag_id(property: impl Into<u8>, id: u32) -> Self {
BitmapClass::Tag {
field: property.into(),
value: TagValue::Id(id),
}
}
}

View file

@ -198,7 +198,7 @@ impl SieveConnection {
Ok(Ok(Some(line))) => {
let is_done =
line.starts_with("OK") || line.starts_with("NO") || line.starts_with("BYE");
println!("<- {:?}", line);
//println!("<- {:?}", line);
lines.push(line);
if is_done {
return lines;
@ -216,13 +216,13 @@ impl SieveConnection {
}
pub async fn send(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
self.writer.write_all(b"\r\n").await.unwrap();
}
pub async fn send_raw(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
}

View file

@ -450,7 +450,7 @@ impl ImapConnection {
Type::Untagged | Type::Status => "* ",
Type::Continuation => "+ ",
});
println!("<- {:?}", line);
//println!("<- {:?}", line);
lines.push(line);
if is_done {
return lines;
@ -468,20 +468,20 @@ impl ImapConnection {
}
pub async fn send(&mut self, text: &str) {
println!("-> {}{:?}", std::str::from_utf8(self.tag).unwrap(), text);
//println!("-> {}{:?}", std::str::from_utf8(self.tag).unwrap(), text);
self.writer.write_all(self.tag).await.unwrap();
self.writer.write_all(text.as_bytes()).await.unwrap();
self.writer.write_all(b"\r\n").await.unwrap();
}
pub async fn send_untagged(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
self.writer.write_all(b"\r\n").await.unwrap();
}
pub async fn send_raw(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
}
}

View file

@ -93,7 +93,7 @@ pub async fn test(server: Arc<JMAP>, client: &mut Client) {
response.contains(&format!("{method} ({algo})")),
"got response {response}, expected {encryption} algo"
);
println!("response = {response}");
//println!("response = {response}");
}
}

View file

@ -389,7 +389,7 @@ impl SmtpConnection {
match tokio::time::timeout(Duration::from_millis(1500), self.reader.next_line()).await {
Ok(Ok(Some(line))) => {
let is_done = line.as_bytes()[3] == b' ';
println!("<- {:?}", line);
//println!("<- {:?}", line);
lines.push(line);
if is_done {
num_responses -= 1;
@ -419,13 +419,13 @@ impl SmtpConnection {
}
pub async fn send(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
self.writer.write_all(b"\r\n").await.unwrap();
}
pub async fn send_raw(&mut self, text: &str) {
println!("-> {:?}", text);
//println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
}
}

View file

@ -499,7 +499,7 @@ pub fn spawn_mock_smtp_server() -> (mpsc::Receiver<MockMessage>, Arc<Mutex<MockS
.unwrap();
while rx.read_line(&mut buf).await.is_ok() {
print!("-> {}", buf);
//print!("-> {}", buf);
if buf.starts_with("EHLO") {
tx.write_all(b"250 Hi there, but I have no extensions to offer :-(\r\n")
.await
@ -590,7 +590,7 @@ pub fn spawn_mock_smtp_server() -> (mpsc::Receiver<MockMessage>, Arc<Mutex<MockS
pub async fn expect_message_delivery(event_rx: &mut mpsc::Receiver<MockMessage>) -> MockMessage {
match tokio::time::timeout(Duration::from_millis(3000), event_rx.recv()).await {
Ok(Some(message)) => {
println!("Got message [{}]", message.message);
//println!("Got message [{}]", message.message);
message
}

View file

@ -233,10 +233,10 @@ pub async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(delete).await;
//email_query::test(params.server.clone(), &mut params.client, delete).await;
/*email_query::test(params.server.clone(), &mut params.client, delete).await;
email_get::test(params.server.clone(), &mut params.client).await;
email_set::test(params.server.clone(), &mut params.client).await;
/*email_parse::test(params.server.clone(), &mut params.client).await;
email_parse::test(params.server.clone(), &mut params.client).await;
email_search_snippet::test(params.server.clone(), &mut params.client).await;
email_changes::test(params.server.clone(), &mut params.client).await;
email_query_changes::test(params.server.clone(), &mut params.client).await;
@ -253,10 +253,10 @@ pub async fn jmap_tests() {
sieve_script::test(params.server.clone(), &mut params.client).await;
vacation_response::test(params.server.clone(), &mut params.client).await;
email_submission::test(params.server.clone(), &mut params.client).await;
websocket::test(params.server.clone(), &mut params.client).await;
websocket::test(params.server.clone(), &mut params.client).await;*/
quota::test(params.server.clone(), &mut params.client).await;
crypto::test(params.server.clone(), &mut params.client).await;
blob::test(params.server.clone(), &mut params.client).await;*/
blob::test(params.server.clone(), &mut params.client).await;
if delete {
params.temp_dir.delete();

View file

@ -56,7 +56,7 @@ async fn ip_lookup_strategy() {
let _rx = start_test_server(core.into(), &[ServerProtocol::Smtp]);
for strategy in [IpLookupStrategy::Ipv6Only, IpLookupStrategy::Ipv6thenIpv4] {
println!("-> Strategy: {:?}", strategy);
//println!("-> Strategy: {:?}", strategy);
// Add mock DNS entries
let mut core = SMTP::test();
core.queue.config.ip_strategy = IfBlock::new(IpLookupStrategy::Ipv6thenIpv4);

View file

@ -27,7 +27,7 @@ use store::ahash::AHashSet;
use store::{write::BatchBuilder, Store};
pub async fn test(db: Arc<Store>) {
pub async fn test(db: Arc<impl Store>) {
println!("Running Store ID assignment tests...");
store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY
@ -42,7 +42,7 @@ pub async fn test(db: Arc<Store>) {
.store(60 * 60, std::sync::atomic::Ordering::Relaxed);
}
async fn test_1(db: Arc<Store>) {
async fn test_1(db: Arc<impl Store>) {
// Test change id assignment
let mut handles = Vec::new();
let mut expected_ids = HashSet::new();
@ -66,7 +66,7 @@ async fn test_1(db: Arc<Store>) {
db.destroy().await;
}
async fn test_2(db: Arc<Store>) {
async fn test_2(db: Arc<impl Store>) {
// Test document id assignment
for wait_for_expiry in [true, false] {
let mut handles = Vec::new();
@ -102,7 +102,7 @@ async fn test_2(db: Arc<Store>) {
db.destroy().await;
}
async fn test_3(db: Arc<Store>) {
async fn test_3(db: Arc<impl Store>) {
// Create document ids and try reassigning
let mut expected_ids = AHashSet::new();
let mut batch = BatchBuilder::new();
@ -133,7 +133,7 @@ async fn test_3(db: Arc<Store>) {
db.destroy().await;
}
async fn test_4(db: Arc<Store>) {
async fn test_4(db: Arc<impl Store>) {
// Try reassigning deleted ids
let mut expected_ids = AHashSet::new();
let mut batch = BatchBuilder::new();

View file

@ -28,10 +28,9 @@ use std::{
use jmap_proto::types::keyword::Keyword;
use nlp::language::Language;
use store::{ahash::AHashMap, query::sort::Pagination, StoreWrite};
use store::{ahash::AHashMap, query::sort::Pagination, write::ValueClass, StoreWrite};
use store::{
fts::builder::FtsIndexBuilder,
query::{Comparator, Filter},
write::{BatchBuilder, F_BITMAP, F_INDEX, F_VALUE},
Store, ValueKey,
@ -117,7 +116,7 @@ pub async fn test(db: Arc<impl Store + Send + 'static>, do_insert: bool) {
let documents = documents.clone();
s.spawn_fifo(move |_| {
let mut fts_builder = FtsIndexBuilder::with_default_language(Language::English);
/*let mut fts_builder = FtsIndexBuilder::with_default_language(Language::English);
let mut builder = BatchBuilder::new();
builder
.with_account_id(0)
@ -167,7 +166,7 @@ pub async fn test(db: Arc<impl Store + Send + 'static>, do_insert: bool) {
}
builder.custom(fts_builder);
documents.lock().unwrap().push(builder.build());
documents.lock().unwrap().push(builder.build());*/
});
}
});
@ -443,8 +442,7 @@ pub async fn test_sort(db: Arc<impl Store>) {
account_id: 0,
collection: COLLECTION_ID,
document_id: document_id as u32,
family: 0,
field: fields["accession_number"],
class: ValueClass::Property(fields["accession_number"])
})
.collect()
)