diff --git a/Cargo.toml b/Cargo.toml index 9588f406..4491e6f2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,11 @@ whatlang = "0.16" # Language detection rust-stemmers = "1.2" # Stemmers tinysegmenter = "0.1" # Japanese tokenizer jieba-rs = "0.6" # Chinese stemmer +xxhash-rust = { version = "0.8.5", features = ["xxh3"] } +farmhash = "1.1.5" +siphasher = "0.3" + +[dev-dependencies] +csv = "1.1" +flate2 = { version = "1.0.17", features = ["zlib"], default-features = false } +rayon = "1.5.1" diff --git a/pepe.toml b/pepe.toml deleted file mode 100644 index 5264166f..00000000 --- a/pepe.toml +++ /dev/null @@ -1,92 +0,0 @@ -I have the following SQLite table for storing email data: - -CREATE TABLE email ( - email_id INTEGER PRIMARY KEY, - blob_id TEXT NOT NULL, - thread_id INTEGER NOT NULL, - size INTEGER NOT NULL, - received_at TIMESTAMP NOT NULL, - message_id TEXT NOT NULL, - in_reply_to TEXT NOT NULL, - sender TEXT NOT NULL, - from TEXT NOT NULL, - to TEXT NOT NULL, - cc TEXT NOT NULL, - bcc TEXT NOT NULL, - reply_to TEXT NOT NULL, - subject TEXT NOT NULL, - sent_at TIMESTAMP NOT NULL, - has_attachment BOOL NOT NULL, - preview TEXT NOT NULL -); - -The mailboxes and keywords for each message are stored in separate tables: - -CREATE TABLE email_mailbox ( - email_id INTEGER PRIMARY KEY, - mailbox_id INTEGER NOT NULL, -) - -CREATE TABLE email_keyword ( - email_id INTEGER PRIMARY KEY, - keyword TEXT NOT NULL -); - -How would you write a SQLite query to list the email IDs of all the Emails with the subject "sales" sorted by messages that belong to the same Thread and have a the keyword "draft", then sorted by received at and then has attachment. - - -[email] -id: INT -blob_id: HASH -thread_id: INT -size: INT -received_at: TIMESTAMP -message_id: TEXT -in_reply_to: TEXT -sender: TEXT -from: TEXT -to: TEXT -cc: TEXT -bcc: TEXT -reply_to: TEXT -subject: TEXT -sent_at: TIMESTAMP -has_attachment: BOOL -preview: TEXT - -[email_mailbox] -email_id: INT -mailbox_id: INT -imap_uid: INT - -[email_keyword] -email_id: INT -keyword: TEXT - - -/* - - o id - o blobId - o threadId - o mailboxIds - o keywords - o size - o receivedAt - o messageId - o inReplyTo - o sender - o from - o to - o cc - o bcc - o replyTo - o subject - o sentAt - o hasAttachment - o preview - -[ "partId", "blobId", "size", "name", "type", "charset", - "disposition", "cid", "language", "location" ] - -*/ \ No newline at end of file diff --git a/src/backend/rocksdb/bitmap.rs b/src/backend/rocksdb/bitmap.rs index e355701a..5adb8cdc 100644 --- a/src/backend/rocksdb/bitmap.rs +++ b/src/backend/rocksdb/bitmap.rs @@ -226,7 +226,7 @@ pub fn bitmap_merge<'x>( deserialize_bitlist(&mut bm, op); } _ => { - debug_assert!(false, "This should not have happend"); + debug_assert!(false, "This should not have happened"); return None; } } diff --git a/src/backend/rocksdb/main.rs b/src/backend/rocksdb/main.rs index 750fcbb0..f5bc82e4 100644 --- a/src/backend/rocksdb/main.rs +++ b/src/backend/rocksdb/main.rs @@ -11,7 +11,7 @@ impl Store { pub fn open() -> crate::Result { // Create the database directory if it doesn't exist let path = PathBuf::from( - "/tmp/rocksdb.test", /*&settings + "/tmp/rocksdb_test", /*&settings .get("db-path") .unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/ ); diff --git a/src/backend/rocksdb/mod.rs b/src/backend/rocksdb/mod.rs index 47b544fe..d156208c 100644 --- a/src/backend/rocksdb/mod.rs +++ b/src/backend/rocksdb/mod.rs @@ -1,8 +1,11 @@ -use crate::{write::key::KeySerializer, BitmapKey, IndexKey, Serialize, ValueKey}; +use crate::{ + write::key::KeySerializer, AclKey, BitmapKey, BlobKey, IndexKey, LogKey, Serialize, ValueKey, +}; pub mod bitmap; pub mod main; pub mod read; +pub mod write; pub const CF_BITMAPS: &str = "b"; pub const CF_VALUES: &str = "v"; @@ -15,13 +18,15 @@ pub const FIELD_PREFIX_LEN: usize = COLLECTION_PREFIX_LEN + std::mem::size_of::< pub const ACCOUNT_KEY_LEN: usize = std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::(); -impl Serialize for IndexKey<'_> { +impl> Serialize for IndexKey { fn serialize(self) -> Vec { - KeySerializer::new(std::mem::size_of::() + self.key.len()) + let key = self.key.as_ref(); + KeySerializer::new(std::mem::size_of::>() + key.len()) .write(self.account_id) .write(self.collection) .write(self.field) - .write(self.key) + .write(key) + .write(self.document_id) .finalize() } } @@ -37,14 +42,55 @@ impl Serialize for ValueKey { } } -impl Serialize for BitmapKey<'_> { +impl> Serialize for BitmapKey { fn serialize(self) -> Vec { - KeySerializer::new(std::mem::size_of::() + self.key.len()) - .write(self.key) - .write(self.field) + let key = self.key.as_ref(); + KeySerializer::new(std::mem::size_of::>() + key.len()) + .write_leb128(self.account_id) .write(self.collection) .write(self.family) - .write_leb128(self.account_id) + .write(self.field) + .write(key) .finalize() } } + +impl> Serialize for BlobKey { + fn serialize(self) -> Vec { + let hash = self.hash.as_ref(); + KeySerializer::new(std::mem::size_of::>() + hash.len()) + .write(hash) + .write_leb128(self.account_id) + .write(self.collection) + .write_leb128(self.document_id) + .finalize() + } +} + +impl Serialize for AclKey { + fn serialize(self) -> Vec { + KeySerializer::new(std::mem::size_of::()) + .write_leb128(self.grant_account_id) + .write(u8::MAX) + .write_leb128(self.to_account_id) + .write(self.to_collection) + .write_leb128(self.to_document_id) + .finalize() + } +} + +impl Serialize for LogKey { + fn serialize(self) -> Vec { + KeySerializer::new(std::mem::size_of::()) + .write(self.account_id) + .write(self.collection) + .write(self.change_id) + .finalize() + } +} + +impl From for crate::Error { + fn from(value: rocksdb::Error) -> Self { + Self::InternalError(format!("RocksDB error: {}", value)) + } +} diff --git a/src/backend/rocksdb/read.rs b/src/backend/rocksdb/read.rs index b119a460..125650f4 100644 --- a/src/backend/rocksdb/read.rs +++ b/src/backend/rocksdb/read.rs @@ -4,21 +4,22 @@ use roaring::RoaringBitmap; use rocksdb::{Direction, IteratorMode}; use crate::{ - query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, IndexKey, - Serialize, Store, ValueKey, BM_DOCUMENT_IDS, + query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, Serialize, + Store, BM_DOCUMENT_IDS, }; use super::{CF_BITMAPS, CF_INDEXES, CF_VALUES, FIELD_PREFIX_LEN}; impl Store { #[inline(always)] - pub fn get_value(&self, key: ValueKey) -> crate::Result> + pub fn get_value(&self, key: impl Serialize) -> crate::Result> where U: Deserialize, { + let key = key.serialize(); if let Some(bytes) = self .db - .get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key.serialize()) + .get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key) .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? { Ok(Some(U::deserialize(&bytes).ok_or_else(|| { @@ -30,14 +31,14 @@ impl Store { } #[inline(always)] - pub fn get_values(&self, keys: Vec) -> crate::Result>> + pub fn get_values(&self, keys: Vec) -> crate::Result>> where U: Deserialize, { let cf_handle = self.db.cf_handle(CF_VALUES).unwrap(); let mut results = Vec::with_capacity(keys.len()); for value in self.db.multi_get_cf( - keys.iter() + keys.into_iter() .map(|key| (&cf_handle, key.serialize())) .collect::>(), ) { @@ -74,14 +75,18 @@ impl Store { } #[inline(always)] - pub fn get_bitmap(&self, key: BitmapKey) -> crate::Result> { + pub fn get_bitmap>( + &self, + key: BitmapKey, + ) -> crate::Result> { + let key = key.serialize(); if let Some(bytes) = self .db - .get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), key.serialize()) + .get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), &key) .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? { let bm = RoaringBitmap::deserialize(&bytes).ok_or_else(|| { - Error::InternalError(format!("Failed to deserialize key: {:?}", key)) + Error::InternalError(format!("Failed to deserialize key: {:?}", &key)) })?; Ok(if !bm.is_empty() { Some(bm) } else { None }) } else { @@ -90,11 +95,11 @@ impl Store { } #[inline(always)] - fn get_bitmaps(&self, keys: Vec) -> crate::Result>> { + fn get_bitmaps(&self, keys: Vec) -> crate::Result>> { let cf_handle = self.db.cf_handle(CF_BITMAPS).unwrap(); let mut results = Vec::with_capacity(keys.len()); for value in self.db.multi_get_cf( - keys.iter() + keys.into_iter() .map(|key| (&cf_handle, key.serialize())) .collect::>(), ) { @@ -116,9 +121,9 @@ impl Store { Ok(results) } - pub(crate) fn get_bitmaps_intersection( + pub(crate) fn get_bitmaps_intersection( &self, - keys: Vec>, + keys: Vec, ) -> crate::Result> { let mut result: Option = None; for bitmap in self.get_bitmaps(keys)? { @@ -138,9 +143,9 @@ impl Store { Ok(result) } - pub(crate) fn get_bitmaps_union( + pub(crate) fn get_bitmaps_union( &self, - keys: Vec>, + keys: Vec, ) -> crate::Result> { let mut result: Option = None; for bitmap in (self.get_bitmaps(keys)?).into_iter().flatten() { @@ -155,21 +160,20 @@ impl Store { pub(crate) fn range_to_bitmap( &self, - key: IndexKey<'_>, + match_key: &[u8], + match_value: &[u8], op: Operator, ) -> crate::Result> { let mut bm = RoaringBitmap::new(); - let match_key = key.serialize(); let match_prefix = &match_key[0..FIELD_PREFIX_LEN]; - let match_value = &match_key[FIELD_PREFIX_LEN..]; for result in self.db.iterator_cf( &self.db.cf_handle(CF_INDEXES).unwrap(), IteratorMode::From( - &match_key, + match_key, match op { - Operator::GreaterThan => Direction::Forward, - Operator::GreaterEqualThan => Direction::Forward, - Operator::Equal => Direction::Forward, + Operator::GreaterThan | Operator::GreaterEqualThan | Operator::Equal => { + Direction::Forward + } _ => Direction::Reverse, }, ), diff --git a/src/backend/rocksdb/write.rs b/src/backend/rocksdb/write.rs new file mode 100644 index 00000000..1501963b --- /dev/null +++ b/src/backend/rocksdb/write.rs @@ -0,0 +1,224 @@ +use std::time::Instant; + +use roaring::RoaringBitmap; +use rocksdb::ErrorKind; + +use crate::{ + write::{key::KeySerializer, Batch, Operation}, + AclKey, BitmapKey, BlobKey, Deserialize, Error, IndexKey, LogKey, Serialize, Store, ValueKey, + BM_BLOOM, BM_DOCUMENT_IDS, +}; + +use super::{ + bitmap::{clear_bit, set_bit}, + CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES, +}; + +impl Store { + pub fn write(&self, batch: Batch) -> crate::Result<()> { + let cf_values = self.db.cf_handle(CF_VALUES).unwrap(); + let cf_bitmaps = self.db.cf_handle(CF_BITMAPS).unwrap(); + let cf_indexes = self.db.cf_handle(CF_INDEXES).unwrap(); + let cf_logs = self.db.cf_handle(CF_LOGS).unwrap(); + let cf_blobs = self.db.cf_handle(CF_BLOBS).unwrap(); + let start = Instant::now(); + + loop { + let mut account_id = u32::MAX; + let mut collection = u8::MAX; + let mut document_id = u32::MAX; + let txn = self.db.transaction(); + let mut wb = txn.get_writebatch(); + + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + set, + } => { + if *document_id_ == u32::MAX { + let key = BitmapKey { + account_id, + collection, + family: BM_DOCUMENT_IDS, + field: u8::MAX, + key: b"", + } + .serialize(); + let mut document_ids = if let Some(bytes) = txn + .get_pinned_for_update_cf(&cf_bitmaps, &key, true) + .map_err(|err| { + Error::InternalError(format!("get_cf failed: {}", err)) + })? { + RoaringBitmap::deserialize(&bytes).ok_or_else(|| { + Error::InternalError(format!( + "Failed to deserialize key: {:?}", + key + )) + })? + } else { + RoaringBitmap::new() + }; + document_id = if let Some(max_id) = document_ids.max() { + let mask = if max_id < 20000 { + RoaringBitmap::from_sorted_iter(0..max_id + 2).unwrap() + } else { + RoaringBitmap::full() + }; + document_ids ^= mask; + document_ids.min().unwrap() + } else { + 0 + }; + wb.merge_cf(&cf_bitmaps, key, set_bit(document_id)); + } else { + document_id = *document_id_; + if !*set { + wb.merge_cf( + &cf_bitmaps, + BitmapKey { + account_id, + collection, + family: BM_DOCUMENT_IDS, + field: u8::MAX, + key: b"", + } + .serialize(), + clear_bit(document_id), + ); + } + } + } + Operation::Value { field, set } => { + let key = ValueKey { + account_id, + collection, + document_id, + field: *field, + } + .serialize(); + if let Some(value) = set { + wb.put_cf(&cf_values, key, value); + } else { + wb.delete_cf(&cf_values, key); + } + } + Operation::Index { field, key, set } => { + let key_ = IndexKey { + account_id, + collection, + document_id, + field: *field, + key, + } + .serialize(); + if *set { + wb.put_cf(&cf_indexes, key_, []); + } else { + wb.delete_cf(&cf_indexes, key_); + } + } + Operation::Bitmap { + family, + field, + key, + set, + } => { + let key = BitmapKey { + account_id, + collection, + family: *family, + field: *field, + key, + } + .serialize(); + let value = if *set { + set_bit(document_id) + } else { + clear_bit(document_id) + }; + wb.merge_cf(&cf_bitmaps, key, value); + } + Operation::Bloom { family, field, set } => { + let key = KeySerializer::new(std::mem::size_of::()) + .write_leb128(account_id) + .write(collection) + .write_leb128(document_id) + .write(u8::MAX) + .write(BM_BLOOM | *family) + .write(*field) + .finalize(); + if let Some(value) = set { + wb.put_cf(&cf_values, key, value); + } else { + wb.delete_cf(&cf_values, key); + } + } + Operation::Blob { key, set } => { + let key = BlobKey { + account_id, + collection, + document_id, + hash: key, + } + .serialize(); + if *set { + wb.put_cf(&cf_blobs, key, []); + } else { + wb.delete_cf(&cf_blobs, key); + } + } + Operation::Acl { + grant_account_id, + set, + } => { + let key = AclKey { + grant_account_id: *grant_account_id, + to_account_id: account_id, + to_collection: collection, + to_document_id: document_id, + } + .serialize(); + if let Some(value) = set { + wb.put_cf(&cf_values, key, value); + } else { + wb.delete_cf(&cf_values, key); + } + } + Operation::Log { change_id, changes } => { + let coco = "_"; + let key = LogKey { + account_id, + collection, + change_id: *change_id, + } + .serialize(); + wb.put_cf(&cf_logs, key, changes); + } + } + } + + match self.db.write(wb) { + Ok(_) => { + //println!("Success with id {}", document_id); + return Ok(()); + } + Err(err) => match err.kind() { + ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain + if start.elapsed().as_secs() < 5 => {} + _ => return Err(err.into()), + }, + } + } + } +} diff --git a/src/fts/bloom.rs b/src/fts/bloom.rs new file mode 100644 index 00000000..bb613bc3 --- /dev/null +++ b/src/fts/bloom.rs @@ -0,0 +1,184 @@ +use std::{ + borrow::Cow, + f64::consts::LN_2, + hash::{Hash, Hasher}, +}; + +use roaring::RoaringBitmap; +use utils::codec::leb128::{Leb128Reader, Leb128Vec}; + +use crate::{Deserialize, Serialize}; + +use super::{stemmer::StemmedToken, tokenizers::Token}; + +pub struct BloomFilter { + m: u64, + b: RoaringBitmap, +} + +#[derive(Debug)] +pub struct BloomHash { + pub h: [u64; 7], +} + +#[derive(Debug)] +pub struct BloomHashGroup { + pub h1: BloomHash, + pub h2: Option, +} + +const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds( + 0xaf1f2242106c64b3, + 0x60ca4cfb4b3ed0ce, + 0xc7dbc0bb615e82b3, + 0x520ad065378daf88, +); +lazy_static::lazy_static! { + static ref SIPHASHER: siphasher::sip::SipHasher13 = + siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b); +} + +const P: f64 = 0.01; + +impl BloomFilter { + pub fn new(items: usize) -> Self { + Self { + m: if items > 0 { + std::cmp::max(Self::estimate_m(items, P), 10240) + } else { + 0 + }, + b: RoaringBitmap::new(), + } + } + + fn from_params(m: u64, b: RoaringBitmap) -> Self { + Self { m, b } + } + + fn estimate_m(n: usize, p: f64) -> u64 { + (((n as f64) * f64::ln(p) / (-8.0 * LN_2.powi(2))).ceil() as u64) * 8 + } + + #[allow(dead_code)] + fn estimate_k(m: u64, n: usize) -> u32 { + std::cmp::max(((m as f64) / (n as f64) * f64::ln(2.0f64)).ceil() as u32, 1) + } + + pub fn insert(&mut self, hash: &BloomHash) { + self.b.insert((hash.h[0] % self.m) as u32); + self.b.insert((hash.h[1] % self.m) as u32); + self.b.insert((hash.h[2] % self.m) as u32); + self.b.insert((hash.h[3] % self.m) as u32); + self.b.insert((hash.h[4] % self.m) as u32); + self.b.insert((hash.h[5] % self.m) as u32); + self.b.insert((hash.h[6] % self.m) as u32); + } + + pub fn contains(&self, hash: &BloomHash) -> bool { + self.b.contains((hash.h[0] % self.m) as u32) + && self.b.contains((hash.h[1] % self.m) as u32) + && self.b.contains((hash.h[2] % self.m) as u32) + && self.b.contains((hash.h[3] % self.m) as u32) + && self.b.contains((hash.h[4] % self.m) as u32) + && self.b.contains((hash.h[5] % self.m) as u32) + && self.b.contains((hash.h[6] % self.m) as u32) + } + + pub fn is_subset(&self, other: &Self) -> bool { + self.b.is_subset(&other.b) + } + + pub fn is_empty(&self) -> bool { + self.m == 0 || self.b.is_empty() + } +} + +impl BloomHash { + pub fn hash + ?Sized>(item: &T) -> Self { + let h1 = xxhash_rust::xxh3::xxh3_64(item.as_ref()); + let h2 = farmhash::hash64(item.as_ref()); + /*let h2 = naive_cityhash::cityhash64_with_seeds( + item.as_ref(), + 0x99693e7c5b56f555, + 0x34809fd70b6ebf45, + );*/ + let h3 = AHASHER.hash_one(item); + let mut sh = *SIPHASHER; + sh.write(item.as_ref()); + let h4 = sh.finish(); + + Self { + h: [h1, h2, h3, h4, h1 ^ h2, h2 ^ h3, h3 ^ h4], + } + } +} + +impl From<&str> for BloomHash { + fn from(s: &str) -> Self { + Self::hash(&s) + } +} + +impl From for BloomHash { + fn from(s: String) -> Self { + Self::hash(&s) + } +} + +impl From<&String> for BloomHash { + fn from(s: &String) -> Self { + Self::hash(&s) + } +} + +impl From> for BloomHash { + fn from(s: Cow<'_, str>) -> Self { + Self::hash(s.as_ref()) + } +} + +impl From> for BloomHashGroup { + fn from(t: Token<'_>) -> Self { + Self { + h1: BloomHash::hash(t.word.as_ref()), + h2: None, + } + } +} + +impl From> for BloomHashGroup { + fn from(t: StemmedToken<'_>) -> Self { + Self { + h1: BloomHash::hash(t.word.as_ref()), + h2: t.stemmed_word.map(|w| BloomHash::hash(&format!("{w}_"))), + } + } +} + +impl From> for BloomHashGroup { + fn from(t: Cow<'_, str>) -> Self { + Self { + h1: BloomHash::hash(t.as_ref()), + h2: None, + } + } +} + +impl Serialize for BloomFilter { + fn serialize(self) -> Vec { + let mut buf = Vec::with_capacity(std::mem::size_of::() + self.b.serialized_size()); + buf.push_leb128(self.m); + let _ = self.b.serialize_into(&mut buf); + buf + } +} + +impl Deserialize for BloomFilter { + fn deserialize(bytes: &[u8]) -> Option { + let (m, pos) = bytes.read_leb128()?; + let b = RoaringBitmap::deserialize_unchecked_from(bytes.get(pos..)?).ok()?; + + Some(Self::from_params(m, b)) + } +} diff --git a/src/fts/index.rs b/src/fts/builder.rs similarity index 53% rename from src/fts/index.rs rename to src/fts/builder.rs index c7c1b71d..64aabd4a 100644 --- a/src/fts/index.rs +++ b/src/fts/builder.rs @@ -1,37 +1,37 @@ -use std::collections::HashSet; +use std::borrow::Cow; use ahash::AHashSet; use crate::{ - write::{BatchBuilder, IntoOperations, Operation, Tokenize}, - Error, Serialize, BM_TERM, TERM_EXACT, TERM_STEMMED, + write::{BatchBuilder, IntoOperations, Operation}, + Serialize, BLOOM_BIGRAM, BLOOM_STEMMED, BLOOM_TRIGRAM, BM_BLOOM, }; use super::{ + bloom::{BloomFilter, BloomHash}, lang::{LanguageDetector, MIN_LANGUAGE_SCORE}, + ngram::ToNgrams, stemmer::Stemmer, - term_index::{TermIndexBuilder, TokenIndex}, Language, }; -pub const MAX_TOKEN_LENGTH: usize = 25; +pub const MAX_TOKEN_LENGTH: usize = 50; struct Text<'x> { field: u8, - text: &'x str, + text: Cow<'x, str>, language: Language, - part_id: u32, } -pub struct IndexBuilder<'x> { +pub struct FtsIndexBuilder<'x> { parts: Vec>, detect: LanguageDetector, default_language: Language, } -impl<'x> IndexBuilder<'x> { - pub fn with_default_language(default_language: Language) -> IndexBuilder<'x> { - IndexBuilder { +impl<'x> FtsIndexBuilder<'x> { + pub fn with_default_language(default_language: Language) -> FtsIndexBuilder<'x> { + FtsIndexBuilder { parts: vec![], detect: LanguageDetector::new(), default_language, @@ -41,30 +41,27 @@ impl<'x> IndexBuilder<'x> { pub fn index( &mut self, field: impl Into, - text: &'x str, + text: impl Into>, mut language: Language, - part_id: u32, ) { + let text = text.into(); if language == Language::Unknown { - language = self.detect.detect(text, MIN_LANGUAGE_SCORE); + language = self.detect.detect(&text, MIN_LANGUAGE_SCORE); } self.parts.push(Text { field: field.into(), text, language, - part_id, }); } } -impl<'x> IntoOperations for IndexBuilder<'x> { +impl<'x> IntoOperations for FtsIndexBuilder<'x> { fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> { let default_language = self .detect .most_frequent_language() .unwrap_or(self.default_language); - let mut term_index = TermIndexBuilder::new(); - let mut words = HashSet::new(); for part in &self.parts { let language = if part.language != Language::Unknown { @@ -72,44 +69,58 @@ impl<'x> IntoOperations for IndexBuilder<'x> { } else { default_language }; + let mut unique_words = AHashSet::new(); + let mut phrase_words = Vec::new(); - let mut terms = Vec::new(); - - for token in Stemmer::new(part.text, language, MAX_TOKEN_LENGTH) { - words.insert((token.word.as_bytes().to_vec(), part.field, true)); - + for token in Stemmer::new(&part.text, language, MAX_TOKEN_LENGTH).collect::>() { + unique_words.insert(token.word.to_string()); if let Some(stemmed_word) = token.stemmed_word.as_ref() { - words.insert((stemmed_word.as_bytes().to_vec(), part.field, false)); + unique_words.insert(format!("{}_", stemmed_word)); } - - terms.push(term_index.add_stemmed_token(token)); + phrase_words.push(token.word); } - if !terms.is_empty() { - term_index.add_terms(part.field, part.part_id, terms); + let mut bloom_stemmed = BloomFilter::new(unique_words.len()); + for word in unique_words { + let hash = BloomHash::from(word); + bloom_stemmed.insert(&hash); + //for h in [0, 1] { + batch.ops.push(Operation::Bitmap { + family: BM_BLOOM, + field: part.field, + key: hash.as_high_rank_hash(0).serialize(), + set: true, + }); + //} } - } - for (key, field, is_exact) in words { - batch.ops.push(Operation::Bitmap { - family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED }, - field, - key, - set: true, + batch.ops.push(Operation::Bloom { + field: part.field, + family: BLOOM_STEMMED, + set: bloom_stemmed.serialize().into(), }); - } - if !term_index.is_empty() { - batch.ops.push(Operation::Value { - field: u8::MAX, - set: term_index.serialize().into(), - }); + if phrase_words.len() > 1 { + batch.ops.push(Operation::Bloom { + field: part.field, + family: BLOOM_BIGRAM, + set: BloomFilter::to_ngrams(&phrase_words, 2).serialize().into(), + }); + if phrase_words.len() > 2 { + batch.ops.push(Operation::Bloom { + field: part.field, + family: BLOOM_TRIGRAM, + set: BloomFilter::to_ngrams(&phrase_words, 3).serialize().into(), + }); + } + } } Ok(()) } } +/* impl IntoOperations for TokenIndex { fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> { let mut tokens = AHashSet::new(); @@ -149,9 +160,4 @@ impl IntoOperations for TokenIndex { Ok(()) } } - -impl Tokenize for TermIndexBuilder { - fn tokenize(&self) -> HashSet> { - unreachable!() - } -} +*/ diff --git a/src/fts/mod.rs b/src/fts/mod.rs index 1e1c46fe..6c14e863 100644 --- a/src/fts/mod.rs +++ b/src/fts/mod.rs @@ -23,12 +23,17 @@ pub mod lang; //pub mod pdf; -pub mod index; +pub mod bloom; +pub mod builder; +pub mod ngram; +pub mod query; pub mod search_snippet; pub mod stemmer; pub mod term_index; pub mod tokenizers; +pub const HIGH_RANK_MOD: u64 = 10_240; + #[derive(Debug, PartialEq, Clone, Copy, Hash, Eq, serde::Serialize, serde::Deserialize)] pub enum Language { Esperanto = 0, diff --git a/src/fts/ngram.rs b/src/fts/ngram.rs new file mode 100644 index 00000000..8015145b --- /dev/null +++ b/src/fts/ngram.rs @@ -0,0 +1,38 @@ +use std::borrow::Cow; + +use super::bloom::{BloomFilter, BloomHashGroup}; + +pub trait ToNgrams: Sized { + fn new(items: usize) -> Self; + fn insert(&mut self, item: &str); + fn to_ngrams(tokens: &[Cow<'_, str>], n: usize) -> Self { + let mut filter = Self::new(tokens.len().saturating_sub(1)); + for words in tokens.windows(n) { + filter.insert(&words.join(" ")); + } + filter + } +} + +impl ToNgrams for BloomFilter { + fn new(items: usize) -> Self { + BloomFilter::new(items) + } + + fn insert(&mut self, item: &str) { + self.insert(&item.into()) + } +} + +impl ToNgrams for Vec { + fn new(items: usize) -> Self { + Vec::with_capacity(items) + } + + fn insert(&mut self, item: &str) { + self.push(BloomHashGroup { + h1: item.into(), + h2: None, + }) + } +} diff --git a/src/fts/query.rs b/src/fts/query.rs new file mode 100644 index 00000000..bfc51924 --- /dev/null +++ b/src/fts/query.rs @@ -0,0 +1,204 @@ +use std::time::Instant; + +use roaring::RoaringBitmap; + +use crate::{ + fts::{ + bloom::{BloomFilter, BloomHash, BloomHashGroup}, + builder::MAX_TOKEN_LENGTH, + ngram::ToNgrams, + stemmer::Stemmer, + tokenizers::Tokenizer, + }, + write::key::KeySerializer, + BitmapKey, Store, ValueKey, BLOOM_BIGRAM, BLOOM_STEMMED, BLOOM_TRIGRAM, BM_BLOOM, +}; + +use super::{Language, HIGH_RANK_MOD}; + +impl Store { + pub(crate) fn fts_query( + &self, + account_id: u32, + collection: u8, + field: u8, + text: &str, + language: Language, + match_phrase: bool, + ) -> crate::Result> { + let real_now = Instant::now(); + + let (bitmaps, hashes, family) = if match_phrase { + let mut tokens = Vec::new(); + let mut bit_keys = Vec::new(); + for token in Tokenizer::new(text, language, MAX_TOKEN_LENGTH) { + let hash = BloomHash::from(token.word.as_ref()); + let key = hash.to_high_rank_key(account_id, collection, field, 0); + if !bit_keys.contains(&key) { + bit_keys.push(key); + } + + tokens.push(token.word); + } + let bitmaps = match self.get_bitmaps_intersection(bit_keys)? { + Some(b) if !b.is_empty() => b, + _ => return Ok(None), + }; + + match tokens.len() { + 0 => (bitmaps, vec![], BLOOM_STEMMED), + 1 => ( + bitmaps, + vec![tokens.into_iter().next().unwrap().into()], + BLOOM_STEMMED, + ), + 2 => ( + bitmaps, + >::to_ngrams(&tokens, 2), + BLOOM_BIGRAM, + ), + _ => ( + bitmaps, + >::to_ngrams(&tokens, 3), + BLOOM_TRIGRAM, + ), + } + } else { + let mut hashes = Vec::new(); + let mut bitmaps = RoaringBitmap::new(); + + for token in Stemmer::new(text, language, MAX_TOKEN_LENGTH) { + let hash = BloomHashGroup { + h2: if let Some(stemmed_word) = token.stemmed_word { + Some(format!("{stemmed_word}_").into()) + } else { + Some(format!("{}_", token.word).into()) + }, + h1: token.word.into(), + }; + + match self.get_bitmaps_union(vec![ + hash.h1.to_high_rank_key(account_id, collection, field, 0), + hash.h2 + .as_ref() + .unwrap() + .to_high_rank_key(account_id, collection, field, 0), + ])? { + Some(b) if !b.is_empty() => { + if !bitmaps.is_empty() { + bitmaps &= b; + if bitmaps.is_empty() { + return Ok(None); + } + } else { + bitmaps = b; + } + } + _ => return Ok(None), + }; + + hashes.push(hash); + } + + (bitmaps, hashes, BLOOM_STEMMED) + }; + + let b_count = bitmaps.len(); + let mut bm = RoaringBitmap::new(); + + /*let keys = bitmaps + .iter() + .map(|document_id| { + KeySerializer::new(std::mem::size_of::()) + .write_leb128(account_id) + .write(collection) + .write_leb128(document_id) + .write(u8::MAX) + .write(BM_BLOOM | family) + .write(field) + .finalize() + }) + .collect::>(); + + self.get_values::(keys)? + .into_iter() + .zip(bitmaps) + .for_each(|(bloom, document_id)| { + if let Some(bloom) = bloom { + if !bloom.is_empty() { + let mut matched = true; + for hash in &hashes { + if !(bloom.contains(&hash.h1) + || hash.h2.as_ref().map_or(false, |h2| bloom.contains(h2))) + { + matched = false; + break; + } + } + + if matched { + bm.insert(document_id); + } + } + } + });*/ + for document_id in bitmaps { + let key = KeySerializer::new(std::mem::size_of::() + 2) + .write_leb128(account_id) + .write(collection) + .write_leb128(document_id) + .write(u8::MAX) + .write(BM_BLOOM | family) + .write(field) + .finalize(); + + if let Some(bloom) = self.get_value::(key)? { + if !bloom.is_empty() { + let mut matched = true; + for hash in &hashes { + if !(bloom.contains(&hash.h1) + || hash.h2.as_ref().map_or(false, |h2| bloom.contains(h2))) + { + matched = false; + break; + } + } + + if matched { + bm.insert(document_id); + } + } + } + } + + println!( + "bloom_match {b_count} items in {:?}ms", + real_now.elapsed().as_millis() + ); + + Ok(Some(bm)) + } +} + +impl BloomHash { + #[inline(always)] + pub fn as_high_rank_hash(&self, n: usize) -> u16 { + (self.h[n] % HIGH_RANK_MOD) as u16 + } + + pub fn to_high_rank_key( + &self, + account_id: u32, + collection: u8, + field: u8, + n: usize, + ) -> Vec { + KeySerializer::new(std::mem::size_of::>() + 2) + .write_leb128(account_id) + .write(collection) + .write(BM_BLOOM) + .write(field) + .write(self.as_high_rank_hash(n)) + .finalize() + } +} diff --git a/src/lib.rs b/src/lib.rs index 1c4797aa..c3f52dc8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,9 @@ pub mod fts; pub mod query; pub mod write; +#[cfg(test)] +pub mod tests; + pub struct Store { db: OptimisticTransactionDB, } @@ -18,20 +21,21 @@ pub trait Serialize { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct BitmapKey<'x> { +pub struct BitmapKey> { pub account_id: u32, pub collection: u8, pub family: u8, pub field: u8, - pub key: &'x [u8], + pub key: T, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub struct IndexKey<'x> { +pub struct IndexKey> { pub account_id: u32, pub collection: u8, + pub document_id: u32, pub field: u8, - pub key: &'x [u8], + pub key: T, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -42,8 +46,32 @@ pub struct ValueKey { pub field: u8, } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct BlobKey> { + pub account_id: u32, + pub collection: u8, + pub document_id: u32, + pub hash: T, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct AclKey { + pub grant_account_id: u32, + pub to_account_id: u32, + pub to_collection: u8, + pub to_document_id: u32, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct LogKey { + pub account_id: u32, + pub collection: u8, + pub change_id: u64, +} + pub type Result = std::result::Result; +#[derive(Debug)] pub enum Error { NotFound, InternalError(String), @@ -52,6 +80,11 @@ pub enum Error { pub const BM_DOCUMENT_IDS: u8 = 0; pub const BM_TERM: u8 = 0x10; pub const BM_TAG: u8 = 0x20; +pub const BM_BLOOM: u8 = 0x40; + +pub const BLOOM_STEMMED: u8 = 0x00; +pub const BLOOM_BIGRAM: u8 = 0x01; +pub const BLOOM_TRIGRAM: u8 = 0x02; pub const TERM_EXACT: u8 = 0x00; pub const TERM_STEMMED: u8 = 0x01; @@ -61,31 +94,3 @@ pub const TERM_HASH: u8 = 0x04; pub const TAG_ID: u8 = 0x00; pub const TAG_TEXT: u8 = 0x01; pub const TAG_STATIC: u8 = 0x02; - -#[cfg(test)] -mod tests { - use rand::Rng; - use roaring::RoaringBitmap; - - use super::*; - - #[test] - fn it_works() { - let mut rb1 = RoaringBitmap::new(); - let mut rb2 = RoaringBitmap::new(); - let total = rand::thread_rng().gen_range(0..100000); - println!("total: {}", total); - - for num in 0..total { - rb1.insert(rand::thread_rng().gen_range(0..u32::MAX)); - rb2.insert(num); - } - - println!("sparse: {}", rb1.serialized_size()); - println!("compact: {}", rb2.serialized_size()); - println!( - "ratio: {}", - rb1.serialized_size() as f64 / rb2.serialized_size() as f64 - ); - } -} diff --git a/src/query/filter.rs b/src/query/filter.rs index 65fa41ad..9702f34a 100644 --- a/src/query/filter.rs +++ b/src/query/filter.rs @@ -1,17 +1,10 @@ -use std::{ - borrow::Cow, - ops::{BitAndAssign, BitOrAssign, BitXorAssign}, -}; +use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign}; -use ahash::AHashSet; use roaring::RoaringBitmap; use crate::{ - fts::{ - index::MAX_TOKEN_LENGTH, stemmer::Stemmer, term_index::TermIndex, tokenizers::Tokenizer, - }, - write::Tokenize, - BitmapKey, Error, IndexKey, Store, ValueKey, BM_TERM, TERM_EXACT, TERM_STEMMED, + write::{key::KeySerializer, Tokenize}, + BitmapKey, IndexKey, Store, BM_TERM, TERM_EXACT, }; use super::{Filter, ResultSet}; @@ -31,6 +24,13 @@ impl Store { let document_ids = self .get_document_ids(account_id, collection)? .unwrap_or_else(RoaringBitmap::new); + if filters.is_empty() { + return Ok(ResultSet { + results: document_ids.clone(), + document_ids, + }); + } + let mut state: State = Filter::And.into(); let mut stack = Vec::new(); let mut filters = filters.into_iter().peekable(); @@ -62,7 +62,7 @@ impl Store { collection, family: BM_TERM | TERM_EXACT, field, - key, + key: key.as_bytes(), }) .collect(), )?, @@ -70,17 +70,16 @@ impl Store { ); } Filter::MatchValue { field, op, value } => { + let key = + KeySerializer::new(std::mem::size_of::>() + value.len()) + .write(account_id) + .write(collection) + .write(field) + .write(&value[..]) + .finalize(); state.op.apply( &mut state.bm, - self.range_to_bitmap( - IndexKey { - account_id, - collection, - field, - key: &value, - }, - op, - )?, + self.range_to_bitmap(&key, &value, op)?, &document_ids, ); } @@ -90,110 +89,18 @@ impl Store { language, match_phrase, } => { - if match_phrase { - let phrase = Tokenizer::new(&text, language, MAX_TOKEN_LENGTH) - .map(|token| token.word) - .collect::>(); - let mut keys = Vec::with_capacity(phrase.len()); - - for word in &phrase { - let key = BitmapKey { - account_id, - collection, - family: BM_TERM | TERM_EXACT, - field, - key: word.as_bytes(), - }; - if !keys.contains(&key) { - keys.push(key); - } - } - - // Retrieve the Term Index for each candidate and match the exact phrase - if let Some(candidates) = self.get_bitmaps_intersection(keys)? { - let mut results = RoaringBitmap::new(); - for document_id in candidates.iter() { - if let Some(term_index) = self.get_value::(ValueKey { - account_id, - collection, - document_id, - field: u8::MAX, - })? { - if term_index - .match_terms( - &phrase - .iter() - .map(|w| term_index.get_match_term(w, None)) - .collect::>(), - None, - true, - false, - false, - ) - .map_err(|e| { - Error::InternalError(format!( - "Corrupted TermIndex for {}: {:?}", - document_id, e - )) - })? - .is_some() - { - results.insert(document_id); - } - } - } - state.op.apply(&mut state.bm, results.into(), &document_ids); - } else { - state.op.apply(&mut state.bm, None, &document_ids); - } - } else { - let words = Stemmer::new(&text, language, MAX_TOKEN_LENGTH) - .map(|token| (token.word, token.stemmed_word.unwrap_or(Cow::from("")))) - .collect::>(); - let mut requested_keys = AHashSet::default(); - let mut text_bitmap = None; - - for (word, stemmed_word) in &words { - let mut keys = Vec::new(); - - for (word, family) in [ - (word, BM_TERM | TERM_EXACT), - (word, BM_TERM | TERM_STEMMED), - (stemmed_word, BM_TERM | TERM_EXACT), - (stemmed_word, BM_TERM | TERM_STEMMED), - ] { - if !word.is_empty() { - let key = BitmapKey { - account_id, - collection, - family, - field, - key: word.as_bytes(), - }; - if !requested_keys.contains(&key) { - requested_keys.insert(key); - keys.push(key); - } - } - } - - // Term already matched on a previous iteration - if keys.is_empty() { - continue; - } - - Filter::And.apply( - &mut text_bitmap, - self.get_bitmaps_union(keys)?, - &document_ids, - ); - - if text_bitmap.as_ref().unwrap().is_empty() { - break; - } - } - state.op.apply(&mut state.bm, text_bitmap, &document_ids); - } + state.op.apply( + &mut state.bm, + self.fts_query( + account_id, + collection, + field, + &text, + language, + match_phrase, + )?, + &document_ids, + ); } Filter::InBitmap { family, field, key } => { state.op.apply( @@ -228,6 +135,8 @@ impl Store { } } + //println!("{:?}: {:?}", state.op, state.bm); + if matches!(state.op, Filter::And) && state.bm.as_ref().unwrap().is_empty() { while let Some(filter) = filters.peek() { if matches!(filter, Filter::End) { diff --git a/src/query/mod.rs b/src/query/mod.rs index 576cb3b0..50156244 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -56,6 +56,7 @@ pub enum Comparator { DocumentSet { set: RoaringBitmap, ascending: bool }, } +#[derive(Debug)] pub struct ResultSet { results: RoaringBitmap, document_ids: RoaringBitmap, @@ -68,7 +69,7 @@ pub struct SortedResultRet { } impl Filter { - pub fn new_condition(field: impl Into, op: Operator, value: impl Serialize) -> Self { + pub fn cond(field: impl Into, op: Operator, value: impl Serialize) -> Self { Filter::MatchValue { field: field.into(), op, @@ -116,7 +117,26 @@ impl Filter { } } - pub fn match_text(field: impl Into, mut text: String, mut language: Language) -> Self { + pub fn has_keyword(field: impl Into, value: impl Into) -> Self { + Filter::HasKeyword { + field: field.into(), + value: value.into(), + } + } + + pub fn has_keywords(field: impl Into, value: impl Into) -> Self { + Filter::HasKeywords { + field: field.into(), + value: value.into(), + } + } + + pub fn match_text( + field: impl Into, + text: impl Into, + mut language: Language, + ) -> Self { + let mut text = text.into(); let match_phrase = (text.starts_with('"') && text.ends_with('"')) || (text.starts_with('\'') && text.ends_with('\'')); @@ -141,6 +161,11 @@ impl Filter { match_phrase, } } + + #[cfg(test)] + pub fn match_english(field: impl Into, text: impl Into) -> Self { + Self::match_text(field, text, Language::English) + } } impl Comparator { diff --git a/src/tests/mod.rs b/src/tests/mod.rs new file mode 100644 index 00000000..7c58068c --- /dev/null +++ b/src/tests/mod.rs @@ -0,0 +1,85 @@ +pub mod query; + +use std::{collections::BTreeSet, f64::consts::LN_2, io::Read, time::Instant}; + +use bitpacking::{BitPacker, BitPacker4x, BitPacker8x}; +use rand::Rng; +use roaring::RoaringBitmap; + +use crate::fts::{ + bloom::BloomFilter, + stemmer::{StemmedToken, Stemmer}, + Language, +}; + +use super::*; + +pub fn deflate_artwork_data() -> Vec { + let mut csv_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + csv_path.push("src"); + csv_path.push("tests"); + csv_path.push("resources"); + csv_path.push("artwork_data.csv.gz"); + + let mut decoder = flate2::bufread::GzDecoder::new(std::io::BufReader::new( + std::fs::File::open(csv_path).unwrap(), + )); + let mut result = Vec::new(); + decoder.read_to_end(&mut result).unwrap(); + result +} + +#[test] +fn it_works() { + for n in [10, 100, 1000, 5000, 10000, 100000] { + let mut rb1 = RoaringBitmap::new(); + let mut h = BTreeSet::new(); + let m = (((n as f64) * f64::ln(0.01) / (-8.0 * LN_2.powi(2))).ceil() as u64) * 8; + + for pos in 0..(n * 7_usize) { + let num = rand::thread_rng().gen_range(0..m as u32); + rb1.insert(num); + h.insert(num); + } + + let mut compressed = vec![0u8; 4 * BitPacker8x::BLOCK_LEN]; + let mut bitpacker = BitPacker8x::new(); + let mut initial_value = 0; + let mut bytes = vec![]; + for chunk in h + .into_iter() + .collect::>() + .chunks_exact(BitPacker8x::BLOCK_LEN) + { + let num_bits: u8 = bitpacker.num_bits_sorted(initial_value, chunk); + let compressed_len = + bitpacker.compress_sorted(initial_value, chunk, &mut compressed[..], num_bits); + initial_value = chunk[chunk.len() - 1]; + //println!("{:?} {}", compressed_len, num_bits); + bytes.push(num_bits); + bytes.extend_from_slice(&compressed[..compressed_len]); + } + + let rb_size = rb1.serialized_size(); + let bp_size = bytes.len(); + if rb_size < bp_size { + println!("For {} Roaring is better {} vs {}", n, rb_size, bp_size); + } else { + println!("For {} BitPack is better {} vs {}", n, bp_size, rb_size); + } + let now = Instant::now(); + let mut ser = Vec::with_capacity(rb_size); + rb1.serialize_into(&mut ser).unwrap(); + println!("Roaring serialization took {:?}", now.elapsed().as_millis()); + let now = Instant::now(); + let deser = RoaringBitmap::deserialize_unchecked_from(&ser[..]).unwrap(); + println!( + "Roaring deserialization took {:?}", + now.elapsed().as_millis() + ); + } + /*println!( + "ratio: {}", + rb1.serialized_size() as f64 / rb2.serialized_size() as f64 + );*/ +} diff --git a/src/tests/query.rs b/src/tests/query.rs new file mode 100644 index 00000000..ac53b441 --- /dev/null +++ b/src/tests/query.rs @@ -0,0 +1,441 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + sync::{Arc, Mutex}, + time::Instant, +}; + +use ahash::AHashMap; + +use crate::{ + fts::{builder::FtsIndexBuilder, Language}, + query::{Comparator, Filter}, + tests::deflate_artwork_data, + write::{BatchBuilder, IntoBitmap, F_INDEX, F_TOKENIZE, F_VALUE}, + Store, ValueKey, BM_DOCUMENT_IDS, +}; + +pub const FIELDS: [&str; 20] = [ + "id", + "accession_number", + "artist", + "artistRole", + "artistId", + "title", + "dateText", + "medium", + "creditLine", + "year", + "acquisitionYear", + "dimensions", + "width", + "height", + "depth", + "units", + "inscription", + "thumbnailCopyright", + "thumbnailUrl", + "url", +]; + +const COLLECTION_ID: u8 = 0; + +enum FieldType { + Keyword, + Text, + FullText, + Integer, +} + +const FIELDS_OPTIONS: [FieldType; 20] = [ + FieldType::Integer, // "id", + FieldType::Keyword, // "accession_number", + FieldType::Text, // "artist", + FieldType::Keyword, // "artistRole", + FieldType::Integer, // "artistId", + FieldType::FullText, // "title", + FieldType::FullText, // "dateText", + FieldType::FullText, // "medium", + FieldType::FullText, // "creditLine", + FieldType::Integer, // "year", + FieldType::Integer, // "acquisitionYear", + FieldType::FullText, // "dimensions", + FieldType::Integer, // "width", + FieldType::Integer, // "height", + FieldType::Integer, // "depth", + FieldType::Text, // "units", + FieldType::FullText, // "inscription", + FieldType::Text, // "thumbnailCopyright", + FieldType::Text, // "thumbnailUrl", + FieldType::Text, // "url", +]; + +#[test] +pub fn db_test() { + let db = Store::open().unwrap(); + test(&db, false); +} + +#[allow(clippy::mutex_atomic)] +pub fn test(db: &Store, do_insert: bool) { + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(8) + .build() + .unwrap(); + let now = Instant::now(); + let documents = Arc::new(Mutex::new(Vec::new())); + + if do_insert { + pool.scope_fifo(|s| { + for (document_id, record) in csv::ReaderBuilder::new() + .has_headers(true) + .from_reader(&deflate_artwork_data()[..]) + .records() + .enumerate() + { + let record = record.unwrap(); + let documents = documents.clone(); + + s.spawn_fifo(move |_| { + let mut fts_builder = FtsIndexBuilder::with_default_language(Language::English); + let mut builder = BatchBuilder::new(); + builder + .with_account_id(0) + .with_collection(COLLECTION_ID) + .update_document(document_id as u32) // Speed up insertion by manually assigning id + .bitmap(u8::MAX, (), 0); + for (pos, field) in record.iter().enumerate() { + let field_id = pos as u8; + match FIELDS_OPTIONS[pos] { + FieldType::Text => { + if !field.is_empty() { + builder.value( + field_id, + field.to_lowercase(), + F_VALUE | F_TOKENIZE, + ); + } + } + FieldType::FullText => { + if !field.is_empty() { + fts_builder.index( + field_id, + field.to_lowercase(), + Language::English, + ); + if field_id == 7 { + builder.value(field_id, field.to_lowercase(), F_INDEX); + } + } + } + FieldType::Integer => { + builder.value( + field_id, + field.parse::().unwrap_or(0), + F_VALUE | F_INDEX, + ); + } + FieldType::Keyword => { + if !field.is_empty() { + builder.value( + field_id, + field.to_lowercase(), + F_VALUE | F_INDEX | F_TOKENIZE, + ); + } + } + } + } + builder.custom(fts_builder).unwrap(); + documents.lock().unwrap().push(builder.build()); + }); + } + }); + + println!( + "Parsed {} entries in {} ms.", + documents.lock().unwrap().len(), + now.elapsed().as_millis() + ); + + let db_ = Arc::new(db); + let now = Instant::now(); + + pool.scope_fifo(|s| { + let mut documents = documents.lock().unwrap(); + + for document in documents.drain(..) { + let db = db_.clone(); + s.spawn_fifo(move |_| { + db.write(document).unwrap(); + }); + } + }); + + println!("Insert took {} ms.", now.elapsed().as_millis()); + } + + println!("Running filter tests..."); + test_filter(db); + + println!("Running sort tests..."); + test_sort(db); +} + +impl IntoBitmap for () { + fn into_bitmap(self) -> (Vec, u8) { + (vec![], BM_DOCUMENT_IDS) + } +} + +pub fn test_filter(db: &Store) { + let mut fields = AHashMap::default(); + for (field_num, field) in FIELDS.iter().enumerate() { + fields.insert(field.to_string(), field_num as u8); + } + + let tests = [ + ( + vec![ + Filter::match_english(fields["title"], "water"), + Filter::eq(fields["year"], 1979u32), + ], + vec!["p11293"], + ), + ( + vec![ + Filter::match_english(fields["medium"], "gelatin"), + Filter::gt(fields["year"], 2000u32), + Filter::lt(fields["width"], 180u32), + Filter::gt(fields["width"], 0u32), + ], + vec!["p79426", "p79427", "p79428", "p79429", "p79430"], + ), + ( + vec![Filter::match_english(fields["title"], "'rustic bridge'")], + vec!["d05503"], + ), + ( + vec![ + Filter::match_english(fields["title"], "'rustic'"), + Filter::match_english(fields["title"], "study"), + ], + vec!["d00399", "d05352"], + ), + ( + vec![ + Filter::has_keywords(fields["artist"], "mauro kunst"), + Filter::has_keyword(fields["artistRole"], "artist"), + Filter::Or, + Filter::eq(fields["year"], 1969u32), + Filter::eq(fields["year"], 1971u32), + Filter::End, + ], + vec!["p01764", "t05843"], + ), + ( + vec![ + Filter::Not, + Filter::match_english(fields["medium"], "oil"), + Filter::End, + Filter::match_english(fields["creditLine"], "bequeath"), + Filter::Or, + Filter::And, + Filter::ge(fields["year"], 1900u32), + Filter::lt(fields["year"], 1910u32), + Filter::End, + Filter::And, + Filter::ge(fields["year"], 2000u32), + Filter::lt(fields["year"], 2010u32), + Filter::End, + Filter::End, + ], + vec![ + "n02478", "n02479", "n03568", "n03658", "n04327", "n04328", "n04721", "n04739", + "n05095", "n05096", "n05145", "n05157", "n05158", "n05159", "n05298", "n05303", + "n06070", "t01181", "t03571", "t05805", "t05806", "t12147", "t12154", "t12155", + ], + ), + ( + vec![ + Filter::And, + Filter::has_keyword(fields["artist"], "warhol"), + Filter::Not, + Filter::match_english(fields["title"], "'campbell'"), + Filter::End, + Filter::Not, + Filter::Or, + Filter::gt(fields["year"], 1980u32), + Filter::And, + Filter::gt(fields["width"], 500u32), + Filter::gt(fields["height"], 500u32), + Filter::End, + Filter::End, + Filter::End, + Filter::eq(fields["acquisitionYear"], 2008u32), + Filter::End, + ], + vec!["ar00039", "t12600"], + ), + ( + vec![ + Filter::match_english(fields["title"], "study"), + Filter::match_english(fields["medium"], "paper"), + Filter::match_english(fields["creditLine"], "'purchased'"), + Filter::Not, + Filter::match_english(fields["title"], "'anatomical'"), + Filter::match_english(fields["title"], "'for'"), + Filter::End, + Filter::gt(fields["year"], 1900u32), + Filter::gt(fields["acquisitionYear"], 2000u32), + ], + vec![ + "p80042", "p80043", "p80044", "p80045", "p80203", "t11937", "t12172", + ], + ), + ]; + + for (filter, expected_results) in tests { + println!("Running test: {:?}", filter); + let mut results: Vec = Vec::with_capacity(expected_results.len()); + let docset = db.filter(0, COLLECTION_ID, filter).unwrap(); + let sorted_docset = db + .sort( + 0, + COLLECTION_ID, + docset, + vec![Comparator::ascending(fields["accession_number"])], + 0, + 0, + None, + 0, + ) + .unwrap(); + + for document_id in sorted_docset.ids { + results.push( + db.get_value(ValueKey { + account_id: 0, + collection: COLLECTION_ID, + document_id, + field: fields["accession_number"], + }) + .unwrap() + .unwrap(), + ); + } + assert_eq!(results, expected_results); + } +} + +pub fn test_sort(db: &Store) { + let mut fields = AHashMap::default(); + for (field_num, field) in FIELDS.iter().enumerate() { + fields.insert(field.to_string(), field_num as u8); + } + + let tests = [ + ( + vec![ + Filter::gt(fields["year"], 0u32), + Filter::gt(fields["acquisitionYear"], 0u32), + Filter::gt(fields["width"], 0u32), + ], + vec![ + Comparator::descending(fields["year"]), + Comparator::ascending(fields["acquisitionYear"]), + Comparator::ascending(fields["width"]), + Comparator::descending(fields["accession_number"]), + ], + vec![ + "t13655", "t13811", "p13352", "p13351", "p13350", "p13349", "p13348", "p13347", + "p13346", "p13345", "p13344", "p13342", "p13341", "p13340", "p13339", "p13338", + "p13337", "p13336", "p13335", "p13334", "p13333", "p13332", "p13331", "p13330", + "p13329", "p13328", "p13327", "p13326", "p13325", "p13324", "p13323", "t13786", + "p13322", "p13321", "p13320", "p13319", "p13318", "p13317", "p13316", "p13315", + "p13314", "t13588", "t13587", "t13586", "t13585", "t13584", "t13540", "t13444", + "ar01154", "ar01153", + ], + ), + ( + vec![ + Filter::gt(fields["width"], 0u32), + Filter::gt(fields["height"], 0u32), + ], + vec![ + Comparator::descending(fields["width"]), + Comparator::ascending(fields["height"]), + ], + vec![ + "t03681", "t12601", "ar00166", "t12625", "t12915", "p04182", "t06483", "ar00703", + "t07671", "ar00021", "t05557", "t07918", "p06298", "p05465", "p06640", "t12855", + "t01355", "t12800", "t12557", "t02078", + ], + ), + ( + vec![], + vec![ + Comparator::descending(fields["medium"]), + Comparator::descending(fields["artistRole"]), + Comparator::ascending(fields["accession_number"]), + ], + vec![ + "ar00627", "ar00052", "t00352", "t07275", "t12318", "t04931", "t13683", "t13686", + "t13687", "t13688", "t13689", "t13690", "t13691", "t07766", "t07918", "t12993", + "ar00044", "t13326", "t07614", "t12414", + ], + ), + ]; + + for (filter, sort, expected_results) in tests { + let mut results: Vec = Vec::with_capacity(expected_results.len()); + let docset = db.filter(0, COLLECTION_ID, filter).unwrap(); + let sorted_docset = db + .sort( + 0, + COLLECTION_ID, + docset, + sort, + expected_results.len(), + 0, + None, + 0, + ) + .unwrap(); + + for document_id in sorted_docset.ids { + results.push( + db.get_value(ValueKey { + account_id: 0, + collection: COLLECTION_ID, + document_id, + field: fields["accession_number"], + }) + .unwrap() + .unwrap(), + ); + } + assert_eq!(results, expected_results); + } +} diff --git a/src/tests/resources/artwork_data.csv.gz b/src/tests/resources/artwork_data.csv.gz new file mode 100644 index 00000000..f6663785 Binary files /dev/null and b/src/tests/resources/artwork_data.csv.gz differ diff --git a/src/write/batch.rs b/src/write/batch.rs index d4f87409..e4590c4e 100644 --- a/src/write/batch.rs +++ b/src/write/batch.rs @@ -9,32 +9,45 @@ impl BatchBuilder { pub fn new() -> Self { Self { ops: Vec::new(), - last_account_id: 0, - last_document_id: 0, last_collection: 0, } } - pub fn with_context( - &mut self, - account_id: u32, - document_id: u32, - collection: impl Into, - ) -> &mut Self { - self.last_account_id = account_id; - self.last_document_id = document_id; - self.last_collection = collection.into(); - self.push_context(); + pub fn with_account_id(&mut self, account_id: u32) -> &mut Self { + self.ops.push(Operation::AccountId { account_id }); self } - #[inline(always)] - pub(super) fn push_context(&mut self) { - self.ops.push(Operation::WithContext { - account_id: self.last_account_id, - document_id: self.last_document_id, + pub fn with_collection(&mut self, collection: impl Into) -> &mut Self { + self.last_collection = collection.into(); + self.ops.push(Operation::Collection { collection: self.last_collection, }); + self + } + + pub fn create_document(&mut self) -> &mut Self { + self.ops.push(Operation::DocumentId { + document_id: u32::MAX, + set: true, + }); + self + } + + pub fn update_document(&mut self, document_id: u32) -> &mut Self { + self.ops.push(Operation::DocumentId { + document_id, + set: true, + }); + self + } + + pub fn delete_document(&mut self, document_id: u32) -> &mut Self { + self.ops.push(Operation::DocumentId { + document_id, + set: false, + }); + self } pub fn value( @@ -51,7 +64,7 @@ impl BatchBuilder { self.ops.push(Operation::Bitmap { family: BM_TERM | TERM_EXACT, field, - key: token, + key: token.into_bytes(), set: is_set, }); } @@ -87,9 +100,9 @@ impl BatchBuilder { }); } - pub fn acl(&mut self, to_account_id: u32, acl: Option) { + pub fn acl(&mut self, grant_account_id: u32, acl: Option) { self.ops.push(Operation::Acl { - to_account_id, + grant_account_id, set: acl.map(|acl| acl.serialize()), }) } diff --git a/src/write/key.rs b/src/write/key.rs index 078e23fe..e844d50c 100644 --- a/src/write/key.rs +++ b/src/write/key.rs @@ -66,6 +66,12 @@ impl KeySerialize for u32 { } } +impl KeySerialize for u16 { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(&self.to_be_bytes()); + } +} + impl KeySerialize for u64 { fn serialize(&self, buf: &mut Vec) { buf.extend_from_slice(&self.to_be_bytes()); diff --git a/src/write/log.rs b/src/write/log.rs index 85851096..4b628056 100644 --- a/src/write/log.rs +++ b/src/write/log.rs @@ -71,7 +71,7 @@ impl IntoOperations for ChangeLogBuilder { for (collection, changes) in self.changes { if collection != batch.last_collection { batch.last_collection = collection; - batch.push_context(); + batch.ops.push(Operation::Collection { collection }); } batch.ops.push(Operation::Log { diff --git a/src/write/mod.rs b/src/write/mod.rs index b68e8d37..2fd2b63f 100644 --- a/src/write/mod.rs +++ b/src/write/mod.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use crate::Serialize; +use crate::{Deserialize, Serialize}; pub mod batch; pub mod key; @@ -16,18 +16,21 @@ pub struct Batch { } pub struct BatchBuilder { - pub last_account_id: u32, - pub last_document_id: u32, pub last_collection: u8, pub ops: Vec, } pub enum Operation { - WithContext { + AccountId { account_id: u32, - document_id: u32, + }, + Collection { collection: u8, }, + DocumentId { + document_id: u32, + set: bool, + }, Value { field: u8, set: Option>, @@ -43,12 +46,17 @@ pub enum Operation { key: Vec, set: bool, }, + Bloom { + field: u8, + family: u8, + set: Option>, + }, Blob { key: Vec, set: bool, }, Acl { - to_account_id: u32, + grant_account_id: u32, set: Option>, }, Log { @@ -69,6 +77,12 @@ impl Serialize for u64 { } } +impl Serialize for u16 { + fn serialize(self) -> Vec { + self.to_be_bytes().to_vec() + } +} + impl Serialize for f64 { fn serialize(self) -> Vec { self.to_be_bytes().to_vec() @@ -87,6 +101,18 @@ impl Serialize for String { } } +impl Serialize for Vec { + fn serialize(self) -> Vec { + self + } +} + +impl Deserialize for String { + fn deserialize(bytes: &[u8]) -> Option { + String::from_utf8_lossy(bytes).into_owned().into() + } +} + trait HasFlag { fn has_flag(&self, flag: u32) -> bool; } @@ -99,11 +125,11 @@ impl HasFlag for u32 { } pub trait Tokenize { - fn tokenize(&self) -> HashSet>; + fn tokenize(&self) -> HashSet; } impl Tokenize for &str { - fn tokenize(&self) -> HashSet> { + fn tokenize(&self) -> HashSet { let mut tokens = HashSet::new(); let mut token = String::new(); @@ -115,35 +141,39 @@ impl Tokenize for &str { token.push(ch); } } else if !token.is_empty() { - tokens.insert(token.into_bytes()); + tokens.insert(token); token = String::new(); } } + if !token.is_empty() { + tokens.insert(token); + } + tokens } } impl Tokenize for String { - fn tokenize(&self) -> HashSet> { + fn tokenize(&self) -> HashSet { self.as_str().tokenize() } } impl Tokenize for u32 { - fn tokenize(&self) -> HashSet> { + fn tokenize(&self) -> HashSet { unreachable!() } } impl Tokenize for u64 { - fn tokenize(&self) -> HashSet> { + fn tokenize(&self) -> HashSet { unreachable!() } } impl Tokenize for f64 { - fn tokenize(&self) -> HashSet> { + fn tokenize(&self) -> HashSet { unreachable!() } }