From 8cbe5bebc322eecc7b17a1a1582b94b4ae85b5bb Mon Sep 17 00:00:00 2001 From: Mauro D Date: Wed, 22 Mar 2023 17:41:58 +0000 Subject: [PATCH] f1 --- .gitignore | 2 + Cargo.toml | 19 + pepe.toml | 92 +++ src/backend/foundationdb/mod.rs | 0 src/backend/mod.rs | 2 + src/backend/rocksdb/bitmap.rs | 326 +++++++++ src/backend/rocksdb/main.rs | 135 ++++ src/backend/rocksdb/mod.rs | 50 ++ src/backend/rocksdb/read.rs | 217 ++++++ src/fts/index.rs | 157 +++++ src/fts/lang.rs | 252 +++++++ src/fts/mod.rs | 161 +++++ src/fts/pdf.rs | 54 ++ src/fts/search_snippet.rs | 277 ++++++++ src/fts/stemmer.rs | 168 +++++ src/fts/term_index.rs | 995 ++++++++++++++++++++++++++++ src/fts/tokenizers/chinese.rs | 197 ++++++ src/fts/tokenizers/indo_european.rs | 167 +++++ src/fts/tokenizers/japanese.rs | 168 +++++ src/fts/tokenizers/mod.rs | 95 +++ src/fts/tokenizers/word.rs | 80 +++ src/lib.rs | 91 +++ src/query/filter.rs | 299 +++++++++ src/query/mod.rs | 160 +++++ src/query/sort.rs | 424 ++++++++++++ src/rocksdb.rs | 343 ++++++++++ src/write/batch.rs | 117 ++++ src/write/key.rs | 93 +++ src/write/log.rs | 109 +++ src/write/mod.rs | 157 +++++ 30 files changed, 5407 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 pepe.toml create mode 100644 src/backend/foundationdb/mod.rs create mode 100644 src/backend/mod.rs create mode 100644 src/backend/rocksdb/bitmap.rs create mode 100644 src/backend/rocksdb/main.rs create mode 100644 src/backend/rocksdb/mod.rs create mode 100644 src/backend/rocksdb/read.rs create mode 100644 src/fts/index.rs create mode 100644 src/fts/lang.rs create mode 100644 src/fts/mod.rs create mode 100644 src/fts/pdf.rs create mode 100644 src/fts/search_snippet.rs create mode 100644 src/fts/stemmer.rs create mode 100644 src/fts/term_index.rs create mode 100644 src/fts/tokenizers/chinese.rs create mode 100644 src/fts/tokenizers/indo_european.rs create mode 100644 src/fts/tokenizers/japanese.rs create mode 100644 src/fts/tokenizers/mod.rs create mode 100644 src/fts/tokenizers/word.rs create mode 100644 src/lib.rs create mode 100644 src/query/filter.rs create mode 100644 src/query/mod.rs create mode 100644 src/query/sort.rs create mode 100644 src/rocksdb.rs create mode 100644 src/write/batch.rs create mode 100644 src/write/key.rs create mode 100644 src/write/log.rs create mode 100644 src/write/mod.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..4fffb2f8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +/Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..9588f406 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "store" +version = "0.1.0" +edition = "2021" + + +[dependencies] +utils = { path = "../utils" } +rand = "0.8.5" +roaring = "0.10.1" +rocksdb = "0.20.1" +serde = { version = "1.0", features = ["derive"]} +ahash = { version = "0.8.0", features = ["serde"] } +bitpacking = "0.8.4" +lazy_static = "1.4" +whatlang = "0.16" # Language detection +rust-stemmers = "1.2" # Stemmers +tinysegmenter = "0.1" # Japanese tokenizer +jieba-rs = "0.6" # Chinese stemmer diff --git a/pepe.toml b/pepe.toml new file mode 100644 index 00000000..5264166f --- /dev/null +++ b/pepe.toml @@ -0,0 +1,92 @@ +I have the following SQLite table for storing email data: + +CREATE TABLE email ( + email_id INTEGER PRIMARY KEY, + blob_id TEXT NOT NULL, + thread_id INTEGER NOT NULL, + size INTEGER NOT NULL, + received_at TIMESTAMP NOT NULL, + message_id TEXT NOT NULL, + in_reply_to TEXT NOT NULL, + sender TEXT NOT NULL, + from TEXT NOT NULL, + to TEXT NOT NULL, + cc TEXT NOT NULL, + bcc TEXT NOT NULL, + reply_to TEXT NOT NULL, + subject TEXT NOT NULL, + sent_at TIMESTAMP NOT NULL, + has_attachment BOOL NOT NULL, + preview TEXT NOT NULL +); + +The mailboxes and keywords for each message are stored in separate tables: + +CREATE TABLE email_mailbox ( + email_id INTEGER PRIMARY KEY, + mailbox_id INTEGER NOT NULL, +) + +CREATE TABLE email_keyword ( + email_id INTEGER PRIMARY KEY, + keyword TEXT NOT NULL +); + +How would you write a SQLite query to list the email IDs of all the Emails with the subject "sales" sorted by messages that belong to the same Thread and have a the keyword "draft", then sorted by received at and then has attachment. + + +[email] +id: INT +blob_id: HASH +thread_id: INT +size: INT +received_at: TIMESTAMP +message_id: TEXT +in_reply_to: TEXT +sender: TEXT +from: TEXT +to: TEXT +cc: TEXT +bcc: TEXT +reply_to: TEXT +subject: TEXT +sent_at: TIMESTAMP +has_attachment: BOOL +preview: TEXT + +[email_mailbox] +email_id: INT +mailbox_id: INT +imap_uid: INT + +[email_keyword] +email_id: INT +keyword: TEXT + + +/* + + o id + o blobId + o threadId + o mailboxIds + o keywords + o size + o receivedAt + o messageId + o inReplyTo + o sender + o from + o to + o cc + o bcc + o replyTo + o subject + o sentAt + o hasAttachment + o preview + +[ "partId", "blobId", "size", "name", "type", "charset", + "disposition", "cid", "language", "location" ] + +*/ \ No newline at end of file diff --git a/src/backend/foundationdb/mod.rs b/src/backend/foundationdb/mod.rs new file mode 100644 index 00000000..e69de29b diff --git a/src/backend/mod.rs b/src/backend/mod.rs new file mode 100644 index 00000000..54822ba1 --- /dev/null +++ b/src/backend/mod.rs @@ -0,0 +1,2 @@ +pub mod foundationdb; +pub mod rocksdb; diff --git a/src/backend/rocksdb/bitmap.rs b/src/backend/rocksdb/bitmap.rs new file mode 100644 index 00000000..e355701a --- /dev/null +++ b/src/backend/rocksdb/bitmap.rs @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use crate::{Deserialize, Serialize}; +use roaring::RoaringBitmap; +use utils::codec::leb128::{Leb128Iterator, Leb128Vec}; + +pub const BIT_SET: u8 = 0x80; +pub const BIT_CLEAR: u8 = 0; + +pub const IS_BITLIST: u8 = 0; +pub const IS_BITMAP: u8 = 1; + +#[inline(always)] +pub fn deserialize_bitlist(bm: &mut RoaringBitmap, bytes: &[u8]) { + let mut it = bytes[1..].iter(); + + 'inner: while let Some(header) = it.next() { + let mut items = (header & 0x7F) + 1; + let is_set = (header & BIT_SET) != 0; + + while items > 0 { + if let Some(doc_id) = it.next_leb128() { + if is_set { + bm.insert(doc_id); + } else { + bm.remove(doc_id); + } + items -= 1; + } else { + debug_assert!(items == 0, "{:?}", bytes); + break 'inner; + } + } + } +} + +#[inline(always)] +pub fn deserialize_bitmap(bytes: &[u8]) -> Option { + RoaringBitmap::deserialize_unchecked_from(&bytes[1..]).ok() +} + +impl Deserialize for RoaringBitmap { + fn deserialize(bytes: &[u8]) -> Option { + match *bytes.first()? { + IS_BITMAP => deserialize_bitmap(bytes), + IS_BITLIST => { + let mut bm = RoaringBitmap::new(); + deserialize_bitlist(&mut bm, bytes); + Some(bm) + } + _ => None, + } + } +} + +impl Serialize for RoaringBitmap { + fn serialize(self) -> Vec { + let mut bytes = Vec::with_capacity(self.serialized_size() + 1); + bytes.push(IS_BITMAP); + let _ = self.serialize_into(&mut bytes); + bytes + } +} + +macro_rules! impl_bit { + ($single:ident, $many:ident, $flag:ident) => { + #[inline(always)] + pub fn $single(document: u32) -> Vec { + let mut buf = Vec::with_capacity(std::mem::size_of::() + 2); + buf.push(IS_BITLIST); + buf.push($flag); + buf.push_leb128(document); + buf + } + + #[inline(always)] + pub fn $many(documents: T) -> Vec + where + T: Iterator, + { + debug_assert!(documents.size_hint().0 > 0); + + let mut buf = Vec::with_capacity( + ((std::mem::size_of::() + 1) + * documents + .size_hint() + .1 + .unwrap_or_else(|| documents.size_hint().0)) + + 2, + ); + + buf.push(IS_BITLIST); + + let mut header_pos = 0; + let mut total_docs = 0; + + for (pos, document) in documents.enumerate() { + if pos & 0x7F == 0 { + header_pos = buf.len(); + buf.push($flag | 0x7F); + } + buf.push_leb128(document); + total_docs = pos; + } + + buf[header_pos] = $flag | ((total_docs & 0x7F) as u8); + + buf + } + }; +} + +impl_bit!(set_bit, set_bits, BIT_SET); +impl_bit!(clear_bit, clear_bits, BIT_CLEAR); + +#[inline(always)] +pub fn set_clear_bits(documents: T) -> Vec +where + T: Iterator, +{ + debug_assert!(documents.size_hint().0 > 0); + + let total_docs = documents + .size_hint() + .1 + .unwrap_or_else(|| documents.size_hint().0); + let buf_len = (std::mem::size_of::() * total_docs) + (total_docs / 0x7F) + 2; + let mut set_buf = Vec::with_capacity(buf_len); + let mut clear_buf = Vec::with_capacity(buf_len); + + let mut set_header_pos = 0; + let mut set_total_docs = 0; + + let mut clear_header_pos = 0; + let mut clear_total_docs = 0; + + set_buf.push(IS_BITLIST); + clear_buf.push(IS_BITLIST); + + for (document, is_set) in documents { + if is_set { + if set_total_docs & 0x7F == 0 { + set_header_pos = set_buf.len(); + set_buf.push(BIT_SET | 0x7F); + } + set_buf.push_leb128(document); + set_total_docs += 1; + } else { + if clear_total_docs & 0x7F == 0 { + clear_header_pos = clear_buf.len(); + clear_buf.push(BIT_CLEAR | 0x7F); + } + clear_buf.push_leb128(document); + clear_total_docs += 1; + } + } + + if set_total_docs > 0 { + set_buf[set_header_pos] = BIT_SET | (((set_total_docs - 1) & 0x7F) as u8); + } + + if clear_total_docs > 0 { + clear_buf[clear_header_pos] = BIT_CLEAR | (((clear_total_docs - 1) & 0x7F) as u8); + } + + if set_total_docs > 0 && clear_total_docs > 0 { + set_buf.extend_from_slice(&clear_buf[1..]); + set_buf + } else if set_total_docs > 0 { + set_buf + } else { + clear_buf + } +} + +#[inline(always)] +pub fn bitmap_merge<'x>( + existing_val: Option<&[u8]>, + operands_len: usize, + operands: impl IntoIterator, +) -> Option> { + let mut bm = match existing_val { + Some(existing_val) => RoaringBitmap::deserialize(existing_val)?, + None if operands_len == 1 => { + return Some(Vec::from(operands.into_iter().next().unwrap())); + } + _ => RoaringBitmap::new(), + }; + + for op in operands.into_iter() { + match *op.first()? { + IS_BITMAP => { + if let Some(union_bm) = deserialize_bitmap(op) { + if !bm.is_empty() { + bm |= union_bm; + } else { + bm = union_bm; + } + } else { + debug_assert!(false, "Failed to deserialize bitmap."); + return None; + } + } + IS_BITLIST => { + deserialize_bitlist(&mut bm, op); + } + _ => { + debug_assert!(false, "This should not have happend"); + return None; + } + } + } + + let mut bytes = Vec::with_capacity(bm.serialized_size() + 1); + bytes.push(IS_BITMAP); + bm.serialize_into(&mut bytes).ok()?; + Some(bytes) +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn merge_bitmaps() { + let v1 = set_clear_bits([(1, true), (2, true), (3, false), (4, true)].into_iter()); + let v2 = set_clear_bits([(1, false), (4, false)].into_iter()); + let v3 = set_clear_bits([(5, true)].into_iter()); + assert_eq!( + RoaringBitmap::from_iter([1, 2, 4]), + RoaringBitmap::deserialize(&v1).unwrap() + ); + assert_eq!( + RoaringBitmap::from_iter([1, 2, 4]), + RoaringBitmap::deserialize(&bitmap_merge(None, 1, [v1.as_ref()]).unwrap()).unwrap() + ); + assert_eq!( + RoaringBitmap::from_iter([2]), + RoaringBitmap::deserialize(&bitmap_merge(None, 2, [v1.as_ref(), v2.as_ref()]).unwrap()) + .unwrap() + ); + assert_eq!( + RoaringBitmap::from_iter([2, 5]), + RoaringBitmap::deserialize( + &bitmap_merge(None, 3, [v1.as_ref(), v2.as_ref(), v3.as_ref()]).unwrap() + ) + .unwrap() + ); + assert_eq!( + RoaringBitmap::from_iter([2, 5]), + RoaringBitmap::deserialize( + &bitmap_merge(Some(v1.as_ref()), 2, [v2.as_ref(), v3.as_ref()]).unwrap() + ) + .unwrap() + ); + assert_eq!( + RoaringBitmap::from_iter([5]), + RoaringBitmap::deserialize(&bitmap_merge(Some(v2.as_ref()), 1, [v3.as_ref()]).unwrap()) + .unwrap() + ); + + assert_eq!( + RoaringBitmap::from_iter([1, 2, 4]), + RoaringBitmap::deserialize( + &bitmap_merge( + Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()), + 1, + [v1.as_ref()] + ) + .unwrap() + ) + .unwrap() + ); + + assert_eq!( + RoaringBitmap::from_iter([1, 2, 3, 4, 5, 6]), + RoaringBitmap::deserialize( + &bitmap_merge( + Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()), + 1, + [RoaringBitmap::from_iter([5, 6]).serialize().as_ref()] + ) + .unwrap() + ) + .unwrap() + ); + + assert_eq!( + RoaringBitmap::from_iter([1, 2, 4, 5, 6]), + RoaringBitmap::deserialize( + &bitmap_merge( + Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()), + 2, + [ + RoaringBitmap::from_iter([5, 6]).serialize().as_ref(), + v1.as_ref() + ] + ) + .unwrap() + ) + .unwrap() + ); + } +} diff --git a/src/backend/rocksdb/main.rs b/src/backend/rocksdb/main.rs new file mode 100644 index 00000000..750fcbb0 --- /dev/null +++ b/src/backend/rocksdb/main.rs @@ -0,0 +1,135 @@ +use std::path::PathBuf; + +use roaring::RoaringBitmap; +use rocksdb::{ColumnFamilyDescriptor, MergeOperands, OptimisticTransactionDB, Options}; + +use crate::{Deserialize, Error, Store}; + +use super::{CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES}; + +impl Store { + pub fn open() -> crate::Result { + // Create the database directory if it doesn't exist + let path = PathBuf::from( + "/tmp/rocksdb.test", /*&settings + .get("db-path") + .unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/ + ); + let mut idx_path = path; + idx_path.push("idx"); + std::fs::create_dir_all(&idx_path).map_err(|err| { + Error::InternalError(format!( + "Failed to create index directory {}: {:?}", + idx_path.display(), + err + )) + })?; + + // Bitmaps + let cf_bitmaps = { + let mut cf_opts = Options::default(); + //cf_opts.set_max_write_buffer_number(16); + cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge); + cf_opts.set_compaction_filter("compact", bitmap_compact); + ColumnFamilyDescriptor::new(CF_BITMAPS, cf_opts) + }; + + // Stored values + let cf_values = { + let mut cf_opts = Options::default(); + cf_opts.set_merge_operator_associative("merge", numeric_value_merge); + ColumnFamilyDescriptor::new(CF_VALUES, cf_opts) + }; + + // Secondary indexes + let cf_indexes = { + let cf_opts = Options::default(); + ColumnFamilyDescriptor::new(CF_INDEXES, cf_opts) + }; + + // Blobs + let cf_blobs = { + let mut cf_opts = Options::default(); + cf_opts.set_enable_blob_files(true); + cf_opts.set_min_blob_size( + 16834, /*settings.parse("blob-min-size").unwrap_or(16384) */ + ); + ColumnFamilyDescriptor::new(CF_BLOBS, cf_opts) + }; + + // Raft log and change log + let cf_log = { + let cf_opts = Options::default(); + ColumnFamilyDescriptor::new(CF_LOGS, cf_opts) + }; + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + Ok(Store { + db: OptimisticTransactionDB::open_cf_descriptors( + &db_opts, + idx_path, + vec![cf_bitmaps, cf_values, cf_indexes, cf_blobs, cf_log], + ) + .map_err(|e| Error::InternalError(e.into_string()))?, + }) + } + + pub fn close(&self) -> crate::Result<()> { + self.db + .flush() + .map_err(|e| Error::InternalError(e.to_string()))?; + self.db.cancel_all_background_work(true); + Ok(()) + } +} + +pub fn numeric_value_merge( + _key: &[u8], + value: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + let mut value = if let Some(value) = value { + i64::from_le_bytes(value.try_into().ok()?) + } else { + 0 + }; + + for op in operands.iter() { + value += i64::from_le_bytes(op.try_into().ok()?); + } + + let mut bytes = Vec::with_capacity(std::mem::size_of::()); + bytes.extend_from_slice(&value.to_le_bytes()); + Some(bytes) +} + +pub fn bitmap_merge( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + super::bitmap::bitmap_merge(existing_val, operands.len(), operands.into_iter()) +} + +pub fn bitmap_partial_merge( + _new_key: &[u8], + _existing_val: Option<&[u8]>, + _operands: &MergeOperands, +) -> Option> { + // Force a full merge + None +} + +pub fn bitmap_compact( + _level: u32, + _key: &[u8], + value: &[u8], +) -> rocksdb::compaction_filter::Decision { + match RoaringBitmap::deserialize(value) { + Some(bm) if bm.is_empty() => rocksdb::compaction_filter::Decision::Remove, + _ => rocksdb::compaction_filter::Decision::Keep, + } +} diff --git a/src/backend/rocksdb/mod.rs b/src/backend/rocksdb/mod.rs new file mode 100644 index 00000000..47b544fe --- /dev/null +++ b/src/backend/rocksdb/mod.rs @@ -0,0 +1,50 @@ +use crate::{write::key::KeySerializer, BitmapKey, IndexKey, Serialize, ValueKey}; + +pub mod bitmap; +pub mod main; +pub mod read; + +pub const CF_BITMAPS: &str = "b"; +pub const CF_VALUES: &str = "v"; +pub const CF_LOGS: &str = "l"; +pub const CF_BLOBS: &str = "o"; +pub const CF_INDEXES: &str = "i"; + +pub const COLLECTION_PREFIX_LEN: usize = std::mem::size_of::() + std::mem::size_of::(); +pub const FIELD_PREFIX_LEN: usize = COLLECTION_PREFIX_LEN + std::mem::size_of::(); +pub const ACCOUNT_KEY_LEN: usize = + std::mem::size_of::() + std::mem::size_of::() + std::mem::size_of::(); + +impl Serialize for IndexKey<'_> { + fn serialize(self) -> Vec { + KeySerializer::new(std::mem::size_of::() + self.key.len()) + .write(self.account_id) + .write(self.collection) + .write(self.field) + .write(self.key) + .finalize() + } +} + +impl Serialize for ValueKey { + fn serialize(self) -> Vec { + KeySerializer::new(std::mem::size_of::()) + .write_leb128(self.account_id) + .write(self.collection) + .write_leb128(self.document_id) + .write(self.field) + .finalize() + } +} + +impl Serialize for BitmapKey<'_> { + fn serialize(self) -> Vec { + KeySerializer::new(std::mem::size_of::() + self.key.len()) + .write(self.key) + .write(self.field) + .write(self.collection) + .write(self.family) + .write_leb128(self.account_id) + .finalize() + } +} diff --git a/src/backend/rocksdb/read.rs b/src/backend/rocksdb/read.rs new file mode 100644 index 00000000..b119a460 --- /dev/null +++ b/src/backend/rocksdb/read.rs @@ -0,0 +1,217 @@ +use std::ops::{BitAndAssign, BitOrAssign}; + +use roaring::RoaringBitmap; +use rocksdb::{Direction, IteratorMode}; + +use crate::{ + query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, IndexKey, + Serialize, Store, ValueKey, BM_DOCUMENT_IDS, +}; + +use super::{CF_BITMAPS, CF_INDEXES, CF_VALUES, FIELD_PREFIX_LEN}; + +impl Store { + #[inline(always)] + pub fn get_value(&self, key: ValueKey) -> crate::Result> + where + U: Deserialize, + { + if let Some(bytes) = self + .db + .get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key.serialize()) + .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? + { + Ok(Some(U::deserialize(&bytes).ok_or_else(|| { + Error::InternalError(format!("Failed to deserialize key: {:?}", key)) + })?)) + } else { + Ok(None) + } + } + + #[inline(always)] + pub fn get_values(&self, keys: Vec) -> crate::Result>> + where + U: Deserialize, + { + let cf_handle = self.db.cf_handle(CF_VALUES).unwrap(); + let mut results = Vec::with_capacity(keys.len()); + for value in self.db.multi_get_cf( + keys.iter() + .map(|key| (&cf_handle, key.serialize())) + .collect::>(), + ) { + results.push( + if let Some(bytes) = value + .map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))? + { + U::deserialize(&bytes) + .ok_or_else(|| { + Error::InternalError("Failed to deserialize keys.".to_string()) + })? + .into() + } else { + None + }, + ); + } + + Ok(results) + } + + pub fn get_document_ids( + &self, + account_id: u32, + collection: u8, + ) -> crate::Result> { + self.get_bitmap(BitmapKey { + account_id, + collection, + family: BM_DOCUMENT_IDS, + field: u8::MAX, + key: b"", + }) + } + + #[inline(always)] + pub fn get_bitmap(&self, key: BitmapKey) -> crate::Result> { + if let Some(bytes) = self + .db + .get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), key.serialize()) + .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? + { + let bm = RoaringBitmap::deserialize(&bytes).ok_or_else(|| { + Error::InternalError(format!("Failed to deserialize key: {:?}", key)) + })?; + Ok(if !bm.is_empty() { Some(bm) } else { None }) + } else { + Ok(None) + } + } + + #[inline(always)] + fn get_bitmaps(&self, keys: Vec) -> crate::Result>> { + let cf_handle = self.db.cf_handle(CF_BITMAPS).unwrap(); + let mut results = Vec::with_capacity(keys.len()); + for value in self.db.multi_get_cf( + keys.iter() + .map(|key| (&cf_handle, key.serialize())) + .collect::>(), + ) { + results.push( + if let Some(bytes) = value + .map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))? + { + RoaringBitmap::deserialize(&bytes) + .ok_or_else(|| { + Error::InternalError("Failed to deserialize keys.".to_string()) + })? + .into() + } else { + None + }, + ); + } + + Ok(results) + } + + pub(crate) fn get_bitmaps_intersection( + &self, + keys: Vec>, + ) -> crate::Result> { + let mut result: Option = None; + for bitmap in self.get_bitmaps(keys)? { + if let Some(bitmap) = bitmap { + if let Some(result) = &mut result { + result.bitand_assign(&bitmap); + if result.is_empty() { + break; + } + } else { + result = Some(bitmap); + } + } else { + return Ok(None); + } + } + Ok(result) + } + + pub(crate) fn get_bitmaps_union( + &self, + keys: Vec>, + ) -> crate::Result> { + let mut result: Option = None; + for bitmap in (self.get_bitmaps(keys)?).into_iter().flatten() { + if let Some(result) = &mut result { + result.bitor_assign(&bitmap); + } else { + result = Some(bitmap); + } + } + Ok(result) + } + + pub(crate) fn range_to_bitmap( + &self, + key: IndexKey<'_>, + op: Operator, + ) -> crate::Result> { + let mut bm = RoaringBitmap::new(); + let match_key = key.serialize(); + let match_prefix = &match_key[0..FIELD_PREFIX_LEN]; + let match_value = &match_key[FIELD_PREFIX_LEN..]; + for result in self.db.iterator_cf( + &self.db.cf_handle(CF_INDEXES).unwrap(), + IteratorMode::From( + &match_key, + match op { + Operator::GreaterThan => Direction::Forward, + Operator::GreaterEqualThan => Direction::Forward, + Operator::Equal => Direction::Forward, + _ => Direction::Reverse, + }, + ), + ) { + let (key, _) = result + .map_err(|err| Error::InternalError(format!("iterator_cf failed: {}", err)))?; + if !key.starts_with(match_prefix) { + break; + } + let doc_id_pos = key.len() - std::mem::size_of::(); + let value = key.get(FIELD_PREFIX_LEN..doc_id_pos).ok_or_else(|| { + Error::InternalError("Invalid key found in 'indexes' column family.".to_string()) + })?; + + match op { + Operator::LowerThan if value >= match_value => { + if value == match_value { + continue; + } else { + break; + } + } + Operator::LowerEqualThan if value > match_value => break, + Operator::GreaterThan if value <= match_value => { + if value == match_value { + continue; + } else { + break; + } + } + Operator::GreaterEqualThan if value < match_value => break, + Operator::Equal if value != match_value => break, + _ => { + bm.insert(key.as_ref().deserialize_be_u32(doc_id_pos).ok_or_else(|| { + Error::InternalError( + "Invalid key found in 'indexes' column family.".to_string(), + ) + })?); + } + } + } + + Ok(Some(bm)) + } +} diff --git a/src/fts/index.rs b/src/fts/index.rs new file mode 100644 index 00000000..c7c1b71d --- /dev/null +++ b/src/fts/index.rs @@ -0,0 +1,157 @@ +use std::collections::HashSet; + +use ahash::AHashSet; + +use crate::{ + write::{BatchBuilder, IntoOperations, Operation, Tokenize}, + Error, Serialize, BM_TERM, TERM_EXACT, TERM_STEMMED, +}; + +use super::{ + lang::{LanguageDetector, MIN_LANGUAGE_SCORE}, + stemmer::Stemmer, + term_index::{TermIndexBuilder, TokenIndex}, + Language, +}; + +pub const MAX_TOKEN_LENGTH: usize = 25; + +struct Text<'x> { + field: u8, + text: &'x str, + language: Language, + part_id: u32, +} + +pub struct IndexBuilder<'x> { + parts: Vec>, + detect: LanguageDetector, + default_language: Language, +} + +impl<'x> IndexBuilder<'x> { + pub fn with_default_language(default_language: Language) -> IndexBuilder<'x> { + IndexBuilder { + parts: vec![], + detect: LanguageDetector::new(), + default_language, + } + } + + pub fn index( + &mut self, + field: impl Into, + text: &'x str, + mut language: Language, + part_id: u32, + ) { + if language == Language::Unknown { + language = self.detect.detect(text, MIN_LANGUAGE_SCORE); + } + self.parts.push(Text { + field: field.into(), + text, + language, + part_id, + }); + } +} + +impl<'x> IntoOperations for IndexBuilder<'x> { + fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> { + let default_language = self + .detect + .most_frequent_language() + .unwrap_or(self.default_language); + let mut term_index = TermIndexBuilder::new(); + let mut words = HashSet::new(); + + for part in &self.parts { + let language = if part.language != Language::Unknown { + part.language + } else { + default_language + }; + + let mut terms = Vec::new(); + + for token in Stemmer::new(part.text, language, MAX_TOKEN_LENGTH) { + words.insert((token.word.as_bytes().to_vec(), part.field, true)); + + if let Some(stemmed_word) = token.stemmed_word.as_ref() { + words.insert((stemmed_word.as_bytes().to_vec(), part.field, false)); + } + + terms.push(term_index.add_stemmed_token(token)); + } + + if !terms.is_empty() { + term_index.add_terms(part.field, part.part_id, terms); + } + } + + for (key, field, is_exact) in words { + batch.ops.push(Operation::Bitmap { + family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED }, + field, + key, + set: true, + }); + } + + if !term_index.is_empty() { + batch.ops.push(Operation::Value { + field: u8::MAX, + set: term_index.serialize().into(), + }); + } + + Ok(()) + } +} + +impl IntoOperations for TokenIndex { + fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> { + let mut tokens = AHashSet::new(); + + for term in self.terms { + for (term_ids, is_exact) in [(term.exact_terms, true), (term.stemmed_terms, false)] { + for term_id in term_ids { + tokens.insert(( + term.field_id, + is_exact, + self.tokens + .get(term_id as usize) + .ok_or_else(|| { + Error::InternalError("Corrupted term index.".to_string()) + })? + .as_bytes() + .to_vec(), + )); + } + } + } + + for (field, is_exact, key) in tokens { + batch.ops.push(Operation::Bitmap { + family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED }, + field, + key, + set: false, + }); + } + + batch.ops.push(Operation::Value { + field: u8::MAX, + set: None, + }); + + Ok(()) + } +} + +impl Tokenize for TermIndexBuilder { + fn tokenize(&self) -> HashSet> { + unreachable!() + } +} diff --git a/src/fts/lang.rs b/src/fts/lang.rs new file mode 100644 index 00000000..e0941d3b --- /dev/null +++ b/src/fts/lang.rs @@ -0,0 +1,252 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use ahash::AHashMap; +use whatlang::{detect, Lang}; + +use super::Language; + +pub const MIN_LANGUAGE_SCORE: f64 = 0.5; + +#[derive(Debug)] +struct WeightedAverage { + weight: usize, + occurrences: usize, + confidence: f64, +} + +#[derive(Debug)] +pub struct LanguageDetector { + lang_detected: AHashMap, +} + +impl Default for LanguageDetector { + fn default() -> Self { + Self::new() + } +} + +impl LanguageDetector { + pub fn new() -> LanguageDetector { + LanguageDetector { + lang_detected: AHashMap::default(), + } + } + + pub fn detect(&mut self, text: &str, min_score: f64) -> Language { + if let Some((language, confidence)) = LanguageDetector::detect_single(text) { + let w = self + .lang_detected + .entry(language) + .or_insert_with(|| WeightedAverage { + weight: 0, + confidence: 0.0, + occurrences: 0, + }); + w.occurrences += 1; + w.weight += text.len(); + w.confidence += confidence * text.len() as f64; + if confidence < min_score { + Language::Unknown + } else { + language + } + } else { + Language::Unknown + } + } + + pub fn most_frequent_language(&self) -> Option { + self.lang_detected + .iter() + .max_by(|(_, a), (_, b)| { + ((a.confidence / a.weight as f64) * a.occurrences as f64) + .partial_cmp(&((b.confidence / b.weight as f64) * b.occurrences as f64)) + .unwrap_or(std::cmp::Ordering::Less) + }) + .map(|(l, _)| *l) + } + + pub fn detect_single(text: &str) -> Option<(Language, f64)> { + detect(text).map(|info| { + ( + match info.lang() { + Lang::Epo => Language::Esperanto, + Lang::Eng => Language::English, + Lang::Rus => Language::Russian, + Lang::Cmn => Language::Mandarin, + Lang::Spa => Language::Spanish, + Lang::Por => Language::Portuguese, + Lang::Ita => Language::Italian, + Lang::Ben => Language::Bengali, + Lang::Fra => Language::French, + Lang::Deu => Language::German, + Lang::Ukr => Language::Ukrainian, + Lang::Kat => Language::Georgian, + Lang::Ara => Language::Arabic, + Lang::Hin => Language::Hindi, + Lang::Jpn => Language::Japanese, + Lang::Heb => Language::Hebrew, + Lang::Yid => Language::Yiddish, + Lang::Pol => Language::Polish, + Lang::Amh => Language::Amharic, + Lang::Jav => Language::Javanese, + Lang::Kor => Language::Korean, + Lang::Nob => Language::Bokmal, + Lang::Dan => Language::Danish, + Lang::Swe => Language::Swedish, + Lang::Fin => Language::Finnish, + Lang::Tur => Language::Turkish, + Lang::Nld => Language::Dutch, + Lang::Hun => Language::Hungarian, + Lang::Ces => Language::Czech, + Lang::Ell => Language::Greek, + Lang::Bul => Language::Bulgarian, + Lang::Bel => Language::Belarusian, + Lang::Mar => Language::Marathi, + Lang::Kan => Language::Kannada, + Lang::Ron => Language::Romanian, + Lang::Slv => Language::Slovene, + Lang::Hrv => Language::Croatian, + Lang::Srp => Language::Serbian, + Lang::Mkd => Language::Macedonian, + Lang::Lit => Language::Lithuanian, + Lang::Lav => Language::Latvian, + Lang::Est => Language::Estonian, + Lang::Tam => Language::Tamil, + Lang::Vie => Language::Vietnamese, + Lang::Urd => Language::Urdu, + Lang::Tha => Language::Thai, + Lang::Guj => Language::Gujarati, + Lang::Uzb => Language::Uzbek, + Lang::Pan => Language::Punjabi, + Lang::Aze => Language::Azerbaijani, + Lang::Ind => Language::Indonesian, + Lang::Tel => Language::Telugu, + Lang::Pes => Language::Persian, + Lang::Mal => Language::Malayalam, + Lang::Ori => Language::Oriya, + Lang::Mya => Language::Burmese, + Lang::Nep => Language::Nepali, + Lang::Sin => Language::Sinhalese, + Lang::Khm => Language::Khmer, + Lang::Tuk => Language::Turkmen, + Lang::Aka => Language::Akan, + Lang::Zul => Language::Zulu, + Lang::Sna => Language::Shona, + Lang::Afr => Language::Afrikaans, + Lang::Lat => Language::Latin, + Lang::Slk => Language::Slovak, + Lang::Cat => Language::Catalan, + Lang::Tgl => Language::Tagalog, + Lang::Hye => Language::Armenian, + }, + info.confidence(), + ) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn detect_languages() { + let inputs = [ + ( + "The quick brown fox jumps over the lazy dog", + Language::English, + ), + ( + "Jovencillo emponzoñado de whisky: ¡qué figurota exhibe!", + Language::Spanish, + ), + ( + "Ma la volpe col suo balzo ha raggiunto il quieto Fido", + Language::Italian, + ), + ( + "Jaz em prisão bota que vexa dez cegonhas felizes", + Language::Portuguese, + ), + ( + "Zwölf Boxkämpfer jagten Victor quer über den großen Sylter Deich", + Language::German, + ), + ("עטלף אבק נס דרך מזגן שהתפוצץ כי חם", Language::Hebrew), + ( + "Съешь ещё этих мягких французских булок, да выпей же чаю", + Language::Russian, + ), + ( + "Чуєш їх, доцю, га? Кумедна ж ти, прощайся без ґольфів!", + Language::Ukrainian, + ), + ( + "Љубазни фењерџија чађавог лица хоће да ми покаже штос", + Language::Serbian, + ), + ( + "Pijamalı hasta yağız şoföre çabucak güvendi", + Language::Turkish, + ), + ("己所不欲,勿施于人。", Language::Mandarin), + ("井の中の蛙大海を知らず", Language::Japanese), + ("시작이 반이다", Language::Korean), + ]; + + let mut detector = LanguageDetector::new(); + + for input in inputs.iter() { + assert_eq!(detector.detect(input.0, 0.0), input.1); + } + } + + #[test] + fn weighted_language() { + let mut detector = LanguageDetector::new(); + for lang in [ + (Language::Spanish, 0.5, 70), + (Language::Japanese, 0.2, 100), + (Language::Japanese, 0.3, 100), + (Language::Japanese, 0.4, 200), + (Language::English, 0.7, 50), + ] + .iter() + { + let w = detector + .lang_detected + .entry(lang.0) + .or_insert_with(|| WeightedAverage { + weight: 0, + confidence: 0.0, + occurrences: 0, + }); + w.occurrences += 1; + w.weight += lang.2; + w.confidence += lang.1 * lang.2 as f64; + } + assert_eq!(detector.most_frequent_language(), Some(Language::Japanese)); + } +} diff --git a/src/fts/mod.rs b/src/fts/mod.rs new file mode 100644 index 00000000..1e1c46fe --- /dev/null +++ b/src/fts/mod.rs @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +pub mod lang; +//pub mod pdf; +pub mod index; +pub mod search_snippet; +pub mod stemmer; +pub mod term_index; +pub mod tokenizers; + +#[derive(Debug, PartialEq, Clone, Copy, Hash, Eq, serde::Serialize, serde::Deserialize)] +pub enum Language { + Esperanto = 0, + English = 1, + Russian = 2, + Mandarin = 3, + Spanish = 4, + Portuguese = 5, + Italian = 6, + Bengali = 7, + French = 8, + German = 9, + Ukrainian = 10, + Georgian = 11, + Arabic = 12, + Hindi = 13, + Japanese = 14, + Hebrew = 15, + Yiddish = 16, + Polish = 17, + Amharic = 18, + Javanese = 19, + Korean = 20, + Bokmal = 21, + Danish = 22, + Swedish = 23, + Finnish = 24, + Turkish = 25, + Dutch = 26, + Hungarian = 27, + Czech = 28, + Greek = 29, + Bulgarian = 30, + Belarusian = 31, + Marathi = 32, + Kannada = 33, + Romanian = 34, + Slovene = 35, + Croatian = 36, + Serbian = 37, + Macedonian = 38, + Lithuanian = 39, + Latvian = 40, + Estonian = 41, + Tamil = 42, + Vietnamese = 43, + Urdu = 44, + Thai = 45, + Gujarati = 46, + Uzbek = 47, + Punjabi = 48, + Azerbaijani = 49, + Indonesian = 50, + Telugu = 51, + Persian = 52, + Malayalam = 53, + Oriya = 54, + Burmese = 55, + Nepali = 56, + Sinhalese = 57, + Khmer = 58, + Turkmen = 59, + Akan = 60, + Zulu = 61, + Shona = 62, + Afrikaans = 63, + Latin = 64, + Slovak = 65, + Catalan = 66, + Tagalog = 67, + Armenian = 68, + Unknown = 69, + None = 70, +} + +impl Language { + pub fn from_iso_639(code: &str) -> Option { + match code.split_once('-').map(|c| c.0).unwrap_or(code) { + "en" => Language::English, + "es" => Language::Spanish, + "pt" => Language::Portuguese, + "it" => Language::Italian, + "fr" => Language::French, + "de" => Language::German, + "ru" => Language::Russian, + "zh" => Language::Mandarin, + "ja" => Language::Japanese, + "ar" => Language::Arabic, + "hi" => Language::Hindi, + "ko" => Language::Korean, + "bn" => Language::Bengali, + "he" => Language::Hebrew, + "ur" => Language::Urdu, + "fa" => Language::Persian, + "ml" => Language::Malayalam, + "or" => Language::Oriya, + "my" => Language::Burmese, + "ne" => Language::Nepali, + "si" => Language::Sinhalese, + "km" => Language::Khmer, + "tk" => Language::Turkmen, + "am" => Language::Amharic, + "az" => Language::Azerbaijani, + "id" => Language::Indonesian, + "te" => Language::Telugu, + "ta" => Language::Tamil, + "vi" => Language::Vietnamese, + "gu" => Language::Gujarati, + "pa" => Language::Punjabi, + "uz" => Language::Uzbek, + "hy" => Language::Armenian, + "ka" => Language::Georgian, + "la" => Language::Latin, + "sl" => Language::Slovene, + "hr" => Language::Croatian, + "sr" => Language::Serbian, + "mk" => Language::Macedonian, + "lt" => Language::Lithuanian, + "lv" => Language::Latvian, + "et" => Language::Estonian, + "tl" => Language::Tagalog, + "af" => Language::Afrikaans, + "zu" => Language::Zulu, + "sn" => Language::Shona, + "ak" => Language::Akan, + _ => return None, + } + .into() + } +} diff --git a/src/fts/pdf.rs b/src/fts/pdf.rs new file mode 100644 index 00000000..9d47baff --- /dev/null +++ b/src/fts/pdf.rs @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::panic; + +use lopdf::Document; + +pub fn extract_pdf(bytes: &[u8]) -> Option { + panic::catch_unwind(|| { + let mut buf = Vec::::new(); + let mut out = PlainTextOutput::new(&mut buf as &mut dyn std::io::Write); + + output_doc(&Document::load_mem(bytes).ok()?, &mut out).ok()?; + + match String::from_utf8(buf) { + Ok(result) => result, + Err(err) => String::from_utf8_lossy(err.as_bytes()).into_owned(), + } + .into() + }) + .ok()? +} + +/* +#[cfg(test)] +mod tests { + + #[test] + fn extract_pdf() { + let bytes = include_bytes!("/tmp/pdf/files/ep.pdf"); + let text = super::extract_pdf(bytes); + } +} +*/ diff --git a/src/fts/search_snippet.rs b/src/fts/search_snippet.rs new file mode 100644 index 00000000..18d08d58 --- /dev/null +++ b/src/fts/search_snippet.rs @@ -0,0 +1,277 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use super::term_index::Term; + +fn escape_char(c: char, string: &mut String) { + match c { + '&' => string.push_str("&"), + '<' => string.push_str("<"), + '>' => string.push_str(">"), + '"' => string.push_str("""), + '\n' | '\r' => string.push(' '), + _ => string.push(c), + } +} + +fn escape_char_len(c: char) -> usize { + match c { + '&' => "&".len(), + '<' => "<".len(), + '>' => ">".len(), + '"' => """.len(), + '\r' | '\n' => 1, + _ => c.len_utf8(), + } +} + +pub fn generate_snippet(terms: &[Term], text: &str) -> Option { + let mut snippet = String::with_capacity(text.len()); + let start_offset = terms.get(0)?.offset as usize; + + if start_offset > 0 { + let mut word_count = 0; + let mut from_offset = 0; + let mut last_is_space = false; + + if text.len() > 240 { + for (pos, char) in text.get(0..start_offset)?.char_indices().rev() { + // Add up to 2 words or 40 characters of context + if char.is_whitespace() { + if !last_is_space { + word_count += 1; + if word_count == 3 { + break; + } + last_is_space = true; + } + } else { + last_is_space = false; + } + from_offset = pos; + if start_offset - from_offset >= 40 { + break; + } + } + } + + last_is_space = false; + for char in text.get(from_offset..start_offset)?.chars() { + if !char.is_whitespace() { + last_is_space = false; + } else { + if last_is_space { + continue; + } + last_is_space = true; + } + escape_char(char, &mut snippet); + } + } + + let mut terms = terms.iter().peekable(); + + 'outer: while let Some(term) = terms.next() { + if snippet.len() + ("".len() * 2) + term.len as usize + 1 > 255 { + break; + } + + snippet.push_str(""); + snippet.push_str(text.get(term.offset as usize..term.offset as usize + term.len as usize)?); + snippet.push_str(""); + + let next_offset = if let Some(next_term) = terms.peek() { + next_term.offset as usize + } else { + text.len() + }; + + let mut last_is_space = false; + for char in text + .get(term.offset as usize + term.len as usize..next_offset)? + .chars() + { + if !char.is_whitespace() { + last_is_space = false; + } else { + if last_is_space { + continue; + } + last_is_space = true; + } + + if snippet.len() + escape_char_len(char) <= 255 { + escape_char(char, &mut snippet); + } else { + break 'outer; + } + } + } + + Some(snippet) +} + +#[cfg(test)] +mod tests { + + use crate::{ + fts::{ + term_index::{TermIndex, TermIndexBuilder}, + tokenizers::Tokenizer, + Language, + }, + Deserialize, Serialize, + }; + + use super::*; + + #[test] + fn search_snippets() { + let inputs = [ + (vec![ + "Help a friend from Abidjan Côte d'Ivoire", + concat!( + "When my mother died when she was given birth to me, my father took me so ", + "special because I am motherless. Before the death of my late father on 22nd June ", + "2013 in a private hospital here in Abidjan Côte d'Ivoire. He secretly called me on his ", + "bedside and told me that he has a sum of $7.5M (Seven Million five Hundred ", + "Thousand Dollars) left in a suspense account in a local bank here in Abidjan Côte ", + "d'Ivoire, that he used my name as his only daughter for the next of kin in deposit of ", + "the fund. ", + "I am 24year old. Dear I am honorably seeking your assistance in the following ways. ", + "1) To provide any bank account where this money would be transferred into. ", + "2) To serve as the guardian of this fund. ", + "3) To make arrangement for me to come over to your country to further my ", + "education and to secure a residential permit for me in your country. ", + "Moreover, I am willing to offer you 30 percent of the total sum as compensation for ", + "your effort input after the successful transfer of this fund to your nominated ", + "account overseas." + )], + vec![ + ( + vec!["côte"], + vec![ + "Help a friend from Abidjan Côte d'Ivoire", + concat!( + "in Abidjan Côte d'Ivoire. He secretly called me on his bedside ", + "and told me that he has a sum of $7.5M (Seven Million five Hundred Thousand ", + "Dollars) left in a suspense account in a local bank here in Abidjan ", + "Côte d'Ivoire, that ") + ] + ), + ( + vec!["your", "country"], + vec![ + concat!( + "honorably seeking your assistance in the following ways. ", + "1) To provide any bank account where this money would be transferred into. 2) ", + "To serve as the guardian of this fund. 3) To make arrangement for me to come ", + "over to your " + )] + ), + ( + vec!["overseas"], + vec![ + "nominated account overseas." + ] + ), + + ], + ), + (vec![ + "孫子兵法", + concat!( + "<\"孫子兵法:\">", + "孫子曰:兵者,國之大事,死生之地,存亡之道,不可不察也。", + "孫子曰:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓客之用,膠漆之材,", + "車甲之奉,日費千金,然後十萬之師舉矣。", + "孫子曰:凡用兵之法,全國為上,破國次之;全旅為上,破旅次之;全卒為上,破卒次之;全伍為上,破伍次之。", + "是故百戰百勝,非善之善者也;不戰而屈人之兵,善之善者也。", + "孫子曰:昔之善戰者,先為不可勝,以待敵之可勝,不可勝在己,可勝在敵。故善戰者,能為不可勝,不能使敵必可勝。", + "故曰:勝可知,而不可為。", + "兵者,詭道也。故能而示之不能,用而示之不用,近而示之遠,遠而示之近。利而誘之,亂而取之,實而備之,強而避之,", + "怒而撓之,卑而驕之,佚而勞之,親而離之。攻其無備,出其不意,此兵家之勝,不可先傳也。", + "夫未戰而廟算勝者,得算多也;未戰而廟算不勝者,得算少也;多算勝,少算不勝,而況於無算乎?吾以此觀之,勝負見矣。", + "孫子曰:凡治眾如治寡,分數是也。鬥眾如鬥寡,形名是也。三軍之眾,可使必受敵而無敗者,奇正是也。兵之所加,", + "如以碬投卵者,虛實是也。", + )], + vec![ + ( + vec!["孫子兵法"], + vec![ + "孫子兵法", + concat!( + "<"孫子兵法:">孫子曰:兵者,國之大事,死生之地,存亡之道,", + "不可不察也。孫子曰:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓客之用,膠"), + ] + ), + ( + vec!["孫子曰"], + vec![ + concat!( + "<"孫子兵法:">孫子曰:兵者,國之大事,死生之地,存亡之道,", + "不可不察也。孫子曰:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓", + )] + ), + ], + ), + ]; + + for (parts, tests) in inputs { + let mut builder = TermIndexBuilder::new(); + + for (field_num, part) in parts.iter().enumerate() { + let mut terms = Vec::new(); + for token in Tokenizer::new(part, Language::English, 40) { + terms.push(builder.add_token(token)); + } + builder.add_terms(field_num as u8, 0, terms); + } + + let compressed_term_index = builder.serialize(); + let term_index = TermIndex::deserialize(&compressed_term_index[..]).unwrap(); + + for (match_words, snippets) in tests { + let mut match_terms = Vec::new(); + for word in &match_words { + match_terms.push(term_index.get_match_term(word, None)); + } + + let term_groups = term_index + .match_terms(&match_terms, None, false, true, true) + .unwrap() + .unwrap(); + + assert_eq!(term_groups.len(), snippets.len()); + + for (term_group, snippet) in term_groups.iter().zip(snippets.iter()) { + assert_eq!( + snippet, + &generate_snippet(&term_group.terms, parts[term_group.field_id as usize]) + .unwrap() + ); + } + } + } + } +} diff --git a/src/fts/stemmer.rs b/src/fts/stemmer.rs new file mode 100644 index 00000000..e341f52f --- /dev/null +++ b/src/fts/stemmer.rs @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::borrow::Cow; + +use rust_stemmers::Algorithm; + +use super::{tokenizers::Tokenizer, Language}; + +#[derive(Debug, PartialEq, Eq)] +pub struct StemmedToken<'x> { + pub word: Cow<'x, str>, + pub stemmed_word: Option>, + pub offset: u32, // Word offset in the text part + pub len: u8, // Word length +} + +pub struct Stemmer<'x> { + stemmer: Option, + tokenizer: Tokenizer<'x>, +} + +impl<'x> Stemmer<'x> { + pub fn new(text: &'x str, language: Language, max_token_length: usize) -> Stemmer<'x> { + Stemmer { + tokenizer: Tokenizer::new(text, language, max_token_length), + stemmer: STEMMER_MAP[language as usize].map(rust_stemmers::Stemmer::create), + } + } +} + +impl<'x> Iterator for Stemmer<'x> { + type Item = StemmedToken<'x>; + + fn next(&mut self) -> Option { + let token = self.tokenizer.next()?; + Some(StemmedToken { + stemmed_word: self.stemmer.as_ref().and_then(|stemmer| { + match stemmer.stem(&token.word) { + Cow::Owned(text) if text.len() != token.len as usize || text != token.word => { + Some(text.into()) + } + _ => None, + } + }), + word: token.word, + offset: token.offset, + len: token.len, + }) + } +} + +static STEMMER_MAP: &[Option] = &[ + None, // Esperanto = 0, + Some(Algorithm::English), // English = 1, + Some(Algorithm::Russian), // Russian = 2, + None, // Mandarin = 3, + Some(Algorithm::Spanish), // Spanish = 4, + Some(Algorithm::Portuguese), // Portuguese = 5, + Some(Algorithm::Italian), // Italian = 6, + None, // Bengali = 7, + Some(Algorithm::French), // French = 8, + Some(Algorithm::German), // German = 9, + None, // Ukrainian = 10, + None, // Georgian = 11, + Some(Algorithm::Arabic), // Arabic = 12, + None, // Hindi = 13, + None, // Japanese = 14, + None, // Hebrew = 15, + None, // Yiddish = 16, + None, // Polish = 17, + None, // Amharic = 18, + None, // Javanese = 19, + None, // Korean = 20, + Some(Algorithm::Norwegian), // Bokmal = 21, + Some(Algorithm::Danish), // Danish = 22, + Some(Algorithm::Swedish), // Swedish = 23, + Some(Algorithm::Finnish), // Finnish = 24, + Some(Algorithm::Turkish), // Turkish = 25, + Some(Algorithm::Dutch), // Dutch = 26, + Some(Algorithm::Hungarian), // Hungarian = 27, + None, // Czech = 28, + Some(Algorithm::Greek), // Greek = 29, + None, // Bulgarian = 30, + None, // Belarusian = 31, + None, // Marathi = 32, + None, // Kannada = 33, + Some(Algorithm::Romanian), // Romanian = 34, + None, // Slovene = 35, + None, // Croatian = 36, + None, // Serbian = 37, + None, // Macedonian = 38, + None, // Lithuanian = 39, + None, // Latvian = 40, + None, // Estonian = 41, + Some(Algorithm::Tamil), // Tamil = 42, + None, // Vietnamese = 43, + None, // Urdu = 44, + None, // Thai = 45, + None, // Gujarati = 46, + None, // Uzbek = 47, + None, // Punjabi = 48, + None, // Azerbaijani = 49, + None, // Indonesian = 50, + None, // Telugu = 51, + None, // Persian = 52, + None, // Malayalam = 53, + None, // Oriya = 54, + None, // Burmese = 55, + None, // Nepali = 56, + None, // Sinhalese = 57, + None, // Khmer = 58, + None, // Turkmen = 59, + None, // Akan = 60, + None, // Zulu = 61, + None, // Shona = 62, + None, // Afrikaans = 63, + None, // Latin = 64, + None, // Slovak = 65, + None, // Catalan = 66, + None, // Tagalog = 67, + None, // Armenian = 68, + None, // Unknown = 69, +]; + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn stemmer() { + let inputs = [ + ( + "love loving lovingly loved lovely", + Language::English, + "love", + ), + ("querer queremos quer", Language::Spanish, "quer"), + ]; + + for (input, language, result) in inputs { + for token in Stemmer::new(input, language, 40) { + assert_eq!(token.stemmed_word.unwrap_or(token.word), result); + } + } + } +} diff --git a/src/fts/term_index.rs b/src/fts/term_index.rs new file mode 100644 index 00000000..551d6c9c --- /dev/null +++ b/src/fts/term_index.rs @@ -0,0 +1,995 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::convert::TryInto; + +use crate::{Deserialize, Serialize}; + +use super::{stemmer::StemmedToken, tokenizers::Token}; + +use ahash::{AHashMap, AHashSet}; +use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x}; +use utils::codec::leb128::{Leb128Reader, Leb128Vec}; + +#[derive(Debug)] +pub enum Error { + DataCorruption, + Leb128DecodeError, + BitpackDecodeError, + InvalidArgument, +} + +pub type TermId = u32; +pub type Result = std::result::Result; + +const LENGTH_SIZE: usize = std::mem::size_of::(); + +#[derive(Debug, PartialEq, Eq)] +pub struct Term { + pub id: TermId, + pub id_stemmed: TermId, + pub offset: u32, + pub len: u8, +} + +#[derive(Debug)] +pub struct TermGroup { + pub field_id: u8, + pub part_id: u32, + pub terms: Vec, +} + +#[derive(Debug)] +pub struct TermIndexBuilderItem { + field: u8, + part_id: u32, + terms: Vec, +} + +#[derive(Debug)] +pub struct TermIndexBuilder { + terms: AHashMap, + items: Vec, +} + +#[derive(Debug)] +pub struct TermIndexItem { + pub field_id: u8, + pub part_id: u32, + pub terms_len: usize, + pub terms: Vec, +} + +#[derive(Debug, Default)] +pub struct TermIndex { + pub token_map: AHashMap, + pub items: Vec, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct MatchTerm { + pub id: TermId, + pub id_stemmed: TermId, +} + +#[derive(Clone, Copy)] +struct TermIndexPacker { + bitpacker_1: BitPacker1x, + bitpacker_4: BitPacker4x, + bitpacker_8: BitPacker8x, + block_len: usize, +} + +impl TermIndexPacker { + pub fn with_block_len(block_len: usize) -> Self { + TermIndexPacker { + bitpacker_1: BitPacker1x::new(), + bitpacker_4: BitPacker4x::new(), + bitpacker_8: BitPacker8x::new(), + block_len, + } + } + + pub fn block_len(&mut self, num: usize) { + self.block_len = num; + } +} + +impl BitPacker for TermIndexPacker { + const BLOCK_LEN: usize = 0; + + fn new() -> Self { + TermIndexPacker { + bitpacker_1: BitPacker1x::new(), + bitpacker_4: BitPacker4x::new(), + bitpacker_8: BitPacker8x::new(), + block_len: 1, + } + } + + fn compress(&self, decompressed: &[u32], compressed: &mut [u8], num_bits: u8) -> usize { + match self.block_len { + BitPacker8x::BLOCK_LEN => self + .bitpacker_8 + .compress(decompressed, compressed, num_bits), + BitPacker4x::BLOCK_LEN => self + .bitpacker_4 + .compress(decompressed, compressed, num_bits), + _ => self + .bitpacker_1 + .compress(decompressed, compressed, num_bits), + } + } + + fn compress_sorted( + &self, + initial: u32, + decompressed: &[u32], + compressed: &mut [u8], + num_bits: u8, + ) -> usize { + match self.block_len { + BitPacker8x::BLOCK_LEN => { + self.bitpacker_8 + .compress_sorted(initial, decompressed, compressed, num_bits) + } + BitPacker4x::BLOCK_LEN => { + self.bitpacker_4 + .compress_sorted(initial, decompressed, compressed, num_bits) + } + _ => self + .bitpacker_1 + .compress_sorted(initial, decompressed, compressed, num_bits), + } + } + + fn decompress(&self, compressed: &[u8], decompressed: &mut [u32], num_bits: u8) -> usize { + match self.block_len { + BitPacker8x::BLOCK_LEN => { + self.bitpacker_8 + .decompress(compressed, decompressed, num_bits) + } + BitPacker4x::BLOCK_LEN => { + self.bitpacker_4 + .decompress(compressed, decompressed, num_bits) + } + _ => self + .bitpacker_1 + .decompress(compressed, decompressed, num_bits), + } + } + + fn decompress_sorted( + &self, + initial: u32, + compressed: &[u8], + decompressed: &mut [u32], + num_bits: u8, + ) -> usize { + match self.block_len { + BitPacker8x::BLOCK_LEN => { + self.bitpacker_8 + .decompress_sorted(initial, compressed, decompressed, num_bits) + } + BitPacker4x::BLOCK_LEN => { + self.bitpacker_4 + .decompress_sorted(initial, compressed, decompressed, num_bits) + } + _ => self + .bitpacker_1 + .decompress_sorted(initial, compressed, decompressed, num_bits), + } + } + + fn num_bits(&self, decompressed: &[u32]) -> u8 { + match self.block_len { + BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits(decompressed), + BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits(decompressed), + _ => self.bitpacker_1.num_bits(decompressed), + } + } + + fn num_bits_sorted(&self, initial: u32, decompressed: &[u32]) -> u8 { + match self.block_len { + BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits_sorted(initial, decompressed), + BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits_sorted(initial, decompressed), + _ => self.bitpacker_1.num_bits_sorted(initial, decompressed), + } + } +} + +#[allow(clippy::new_without_default)] +impl TermIndexBuilder { + pub fn new() -> TermIndexBuilder { + TermIndexBuilder { + items: Vec::new(), + terms: AHashMap::default(), + } + } + + pub fn add_token(&mut self, token: Token) -> Term { + let id = self.terms.len() as u32; + let id = self + .terms + .entry(token.word.into_owned()) + .or_insert_with(|| id); + Term { + id: *id, + id_stemmed: *id, + offset: token.offset, + len: token.len, + } + } + + pub fn add_stemmed_token(&mut self, token: StemmedToken) -> Term { + let id = self.terms.len() as u32; + let id = *self + .terms + .entry(token.word.into_owned()) + .or_insert_with(|| id); + let id_stemmed = if let Some(stemmed_word) = token.stemmed_word { + let id_stemmed = self.terms.len() as u32; + *self + .terms + .entry(stemmed_word.into_owned()) + .or_insert_with(|| id_stemmed) + } else { + id + }; + Term { + id, + id_stemmed, + offset: token.offset, + len: token.len, + } + } + + pub fn add_terms(&mut self, field: u8, part_id: u32, terms: Vec) { + self.items.push(TermIndexBuilderItem { + field, + part_id, + terms, + }); + } + + pub fn is_empty(&self) -> bool { + self.items.is_empty() + } +} + +impl Serialize for TermIndexBuilder { + fn serialize(self) -> Vec { + // Add tokens + if self.terms.is_empty() { + return Vec::new(); + } + let mut terms = vec![""; self.terms.len()]; + let mut terms_len = 0; + for (word, id) in &self.terms { + terms[*id as usize] = word; + terms_len += word.len() + 1; + } + + // Serialize tokens + let mut bytes = Vec::with_capacity( + terms_len + ((self.items.len() / self.terms.len()) * std::mem::size_of::() * 2), + ); + bytes.push_leb128(self.terms.len()); + for terms in terms { + bytes.extend_from_slice(terms.as_bytes()); + bytes.push(0); + } + + // Write terms + let mut bitpacker = TermIndexPacker::new(); + let mut compressed = vec![0u8; 4 * BitPacker8x::BLOCK_LEN]; + + for term_index in &self.items { + let mut ids = Vec::with_capacity(term_index.terms.len() * 2); + let mut offsets = Vec::with_capacity(term_index.terms.len()); + let mut lengths = Vec::with_capacity(term_index.terms.len()); + + let header_pos = bytes.len(); + bytes.extend_from_slice(&[0u8; LENGTH_SIZE]); + bytes.push(term_index.field); + bytes.push_leb128(term_index.part_id); + bytes.push_leb128(term_index.terms.len()); + + let terms_pos = bytes.len(); + + for term in &term_index.terms { + ids.push(term.id); + ids.push(term.id_stemmed); + offsets.push(term.offset); + lengths.push(term.len); + } + + for (chunk, is_sorted) in [(ids, false), (offsets, true)] { + let mut pos = 0; + let len = chunk.len(); + let mut initial_value = 0; + + while pos < len { + let block_len = match len - pos { + 0..=31 => 0, + 32..=127 => BitPacker1x::BLOCK_LEN, + 128..=255 => BitPacker4x::BLOCK_LEN, + _ => BitPacker8x::BLOCK_LEN, + }; + + if block_len > 0 { + let chunk = &chunk[pos..pos + block_len]; + bitpacker.block_len(block_len); + if is_sorted { + let num_bits: u8 = bitpacker.num_bits_sorted(initial_value, chunk); + let compressed_len = bitpacker.compress_sorted( + initial_value, + chunk, + &mut compressed[..], + num_bits, + ); + bytes.push(num_bits); + bytes.extend_from_slice(&compressed[..compressed_len]); + initial_value = chunk[chunk.len() - 1]; + } else { + let num_bits: u8 = bitpacker.num_bits(chunk); + let compressed_len = + bitpacker.compress(chunk, &mut compressed[..], num_bits); + bytes.push(num_bits); + bytes.extend_from_slice(&compressed[..compressed_len]); + } + + pos += block_len; + } else { + for val in &chunk[pos..] { + bytes.push_leb128(*val); + } + pos = len; + } + } + } + bytes.append(&mut lengths); + + let len = (bytes.len() - terms_pos) as u32; + bytes[header_pos..header_pos + LENGTH_SIZE].copy_from_slice(&len.to_le_bytes()); + } + + bytes + } +} + +impl Deserialize for TermIndex { + fn deserialize(bytes: &[u8]) -> Option { + let (num_tokens, mut pos) = bytes.read_leb128()?; + let mut token_map = AHashMap::with_capacity(num_tokens as usize); + for term_id in 0..num_tokens { + let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?; + token_map.insert( + String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?, + term_id, + ); + pos += nil_pos + 1; + } + + let mut term_index = TermIndex { + items: Vec::new(), + token_map, + }; + + while pos < bytes.len() { + let item_len = + u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize; + pos += LENGTH_SIZE; + + let field = bytes.get(pos)?; + pos += 1; + + let (part_id, bytes_read) = bytes.get(pos..)?.read_leb128()?; + pos += bytes_read; + + let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128()?; + pos += bytes_read; + + term_index.items.push(TermIndexItem { + field_id: *field, + part_id, + terms_len, + terms: bytes.get(pos..pos + item_len)?.to_vec(), + }); + + pos += item_len; + } + + Some(term_index) + } +} + +impl TermIndex { + pub fn get_match_term(&self, word: &str, stemmed_word: Option<&str>) -> MatchTerm { + let id = self.token_map.get(word).copied().unwrap_or(u32::MAX); + let id_stemmed = stemmed_word + .and_then(|word| self.token_map.get(word)) + .copied() + .unwrap_or(id); + + MatchTerm { id, id_stemmed } + } + + fn skip_items(&self, bytes: &[u8], mut remaining_items: usize) -> Result { + let mut pos = 0; + while remaining_items > 0 { + let block_len = match remaining_items { + 0..=31 => 0, + 32..=127 => BitPacker1x::BLOCK_LEN, + 128..=255 => BitPacker4x::BLOCK_LEN, + _ => BitPacker8x::BLOCK_LEN, + }; + + if block_len > 0 { + pos += + ((*bytes.get(pos).ok_or(Error::DataCorruption)? as usize) * block_len / 8) + 1; + remaining_items -= block_len; + } else { + while remaining_items > 0 { + let bytes_read = bytes + .get(pos..) + .ok_or(Error::DataCorruption)? + .skip_leb128() + .ok_or(Error::Leb128DecodeError)?; + + pos += bytes_read; + remaining_items -= 1; + } + } + } + Ok(pos) + } + + fn uncompress_chunk( + bytes: &[u8], + remaining_items: usize, + initial_value: Option, + ) -> Result<(usize, Vec)> { + let block_len = match remaining_items { + 0..=31 => 0, + 32..=127 => BitPacker1x::BLOCK_LEN, + 128..=255 => BitPacker4x::BLOCK_LEN, + _ => BitPacker8x::BLOCK_LEN, + }; + + if block_len > 0 { + let bitpacker = TermIndexPacker::with_block_len(block_len); + let num_bits = *bytes.first().ok_or(Error::DataCorruption)?; + let bytes_read = ((num_bits as usize) * block_len / 8) + 1; + let mut decompressed = vec![0u32; block_len]; + + if let Some(initial_value) = initial_value { + bitpacker.decompress_sorted( + initial_value, + bytes.get(1..bytes_read).ok_or(Error::DataCorruption)?, + &mut decompressed[..], + num_bits, + ); + } else { + bitpacker.decompress( + bytes.get(1..bytes_read).ok_or(Error::DataCorruption)?, + &mut decompressed[..], + num_bits, + ); + } + + Ok((bytes_read, decompressed)) + } else { + let mut decompressed = Vec::with_capacity(remaining_items); + let mut pos = 0; + while decompressed.len() < remaining_items { + let (val, bytes_read) = bytes + .get(pos..) + .ok_or(Error::DataCorruption)? + .read_leb128() + .ok_or(Error::Leb128DecodeError)?; + decompressed.push(val); + pos += bytes_read; + } + Ok((pos, decompressed)) + } + } + + pub fn match_terms( + &self, + match_terms: &[MatchTerm], + match_in: Option>, + match_phrase: bool, + match_many: bool, + include_offsets: bool, + ) -> Result>> { + let mut result = Vec::new(); + + // Safety check to avoid overflowing the bit mask + if !match_phrase && !(1..=64).contains(&match_terms.len()) { + return Err(Error::InvalidArgument); + } + + // Term matching is done using a bit mask, where each bit represents a word. + // Each time a word is matched, the corresponding bit is cleared. + // When all bits are cleared, all matching terms are added to the result list. + let words_mask: u64 = u64::MAX >> (64 - match_terms.len()); + let mut matched_mask = words_mask; + + for item in &self.items { + if let Some(ref match_in) = match_in { + if !match_in.contains(&item.field_id) { + continue; + } + } + + let mut terms = Vec::new(); + let mut partial_match = Vec::new(); + + let mut term_pos = 0; + let mut byte_pos = 0; + + 'term_loop: while term_pos < item.terms_len { + let (bytes_read, chunk) = TermIndex::uncompress_chunk( + item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?, + (item.terms_len * 2) - (term_pos * 2), + None, + )?; + + byte_pos += bytes_read; + + for encoded_term in chunk.chunks_exact(2) { + let term_id = encoded_term[0]; + let term_id_stemmed = encoded_term[1]; + + if match_phrase { + let match_pos = partial_match.len(); + if match_terms[match_pos].id == term_id { + partial_match.push(Term { + id: term_id, + id_stemmed: term_id_stemmed, + offset: term_pos as u32, + len: 0, + }); + if partial_match.len() == match_terms.len() { + terms.append(&mut partial_match); + if !match_many { + break 'term_loop; + } + } + } else if match_pos > 0 { + partial_match.clear(); + } + } else { + 'match_loop: for (match_pos, match_term) in match_terms.iter().enumerate() { + if match_term.id == term_id + || match_term.id == term_id_stemmed + || ((match_term.id_stemmed != match_term.id) + && (match_term.id_stemmed == term_id + || match_term.id_stemmed == term_id_stemmed)) + { + partial_match.push(Term { + id: term_id, + id_stemmed: term_id_stemmed, + offset: term_pos as u32, + len: 0, + }); + + // Clear the bit corresponding to the matched term + matched_mask &= !(1 << match_pos); + break 'match_loop; + } + } + + if !match_many && matched_mask == 0 { + break 'term_loop; + } + } + term_pos += 1; + } + } + + if !match_phrase && !partial_match.is_empty() { + terms.append(&mut partial_match); + } + + if !terms.is_empty() { + if include_offsets { + // Skip any term ids that were not uncompressed + if term_pos < item.terms_len { + byte_pos += self.skip_items( + item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?, + (item.terms_len - term_pos) * 2, + )?; + } + + // Uncompress offsets + let mut term_it = terms.iter_mut(); + let mut term = term_it.next().unwrap(); + let mut initial_value = 0; + term_pos = 0; + + 'outer: while term_pos < item.terms_len { + let (bytes_read, chunk) = TermIndex::uncompress_chunk( + item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?, + item.terms_len - term_pos, + Some(initial_value), + )?; + + initial_value = chunk[chunk.len() - 1]; + byte_pos += bytes_read; + + for offset in chunk.into_iter() { + if term.offset == term_pos as u32 { + term.len = *item + .terms + .get(item.terms.len() - item.terms_len + term.offset as usize) + .ok_or(Error::DataCorruption)?; + term.offset = offset; + if let Some(next_term) = term_it.next() { + term = next_term; + } else { + break 'outer; + } + } + term_pos += 1; + } + } + } + + result.push(TermGroup { + field_id: item.field_id, + part_id: item.part_id, + terms, + }); + + if !match_many { + break; + } + } + } + + Ok(if !result.is_empty() { + Some(result) + } else { + None + }) + } +} + +#[derive(Default)] +pub struct Terms { + pub field_id: u8, + pub exact_terms: AHashSet, + pub stemmed_terms: AHashSet, +} + +pub struct TokenIndex { + pub tokens: Vec, + pub terms: Vec, +} + +impl Deserialize for TokenIndex { + fn deserialize(bytes: &[u8]) -> Option { + let (num_tokens, mut pos) = bytes.read_leb128::()?; + let mut tokens = Vec::with_capacity(num_tokens as usize); + for _ in 0..num_tokens { + let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?; + tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?); + pos += nil_pos + 1; + } + + let mut terms = Vec::new(); + while pos < bytes.len() { + let item_len = + u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize; + pos += LENGTH_SIZE; + + let mut field_terms = Terms { + field_id: *bytes.get(pos)?, + exact_terms: AHashSet::default(), + stemmed_terms: AHashSet::default(), + }; + pos += 1; + + let bytes_read = bytes.get(pos..)?.skip_leb128()?; + pos += bytes_read; + + let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::()?; + pos += bytes_read; + + let mut term_pos = 0; + let mut byte_pos = pos; + + while term_pos < terms_len { + let (bytes_read, chunk) = TermIndex::uncompress_chunk( + bytes.get(byte_pos..)?, + (terms_len - term_pos) * 2, + None, + ) + .ok()?; + + byte_pos += bytes_read; + + for encoded_term in chunk.chunks_exact(2) { + let term_id = encoded_term[0]; + let term_id_stemmed = encoded_term[1]; + + field_terms.exact_terms.insert(term_id); + if term_id != term_id_stemmed { + field_terms.stemmed_terms.insert(term_id_stemmed); + } + term_pos += 1; + } + } + + terms.push(field_terms); + pos += item_len; + } + + Some(TokenIndex { tokens, terms }) + } +} + +#[cfg(test)] +mod tests { + + use ahash::{AHashMap, AHashSet}; + + use crate::{ + fts::{ + stemmer::Stemmer, + term_index::{TermIndexBuilder, TokenIndex}, + Language, + }, + Deserialize, Serialize, + }; + + use super::TermIndex; + + #[test] + #[allow(clippy::bind_instead_of_map)] + fn term_index() { + const SUBJECT: u8 = 1; + const BODY: u8 = 2; + const ATTACHMENT: u8 = 3; + + let parts = [ + ( + r#"I felt happy because I saw the others were happy + and because I knew I should feel happy, but I wasn’t + really happy."#, + SUBJECT, + ), + ( + r#"But good morning! Good morning to ye and thou! I’d + say to all my patients, because I was the worse of the + hypocrites, of all the hypocrites, the cruel and phony + hypocrites, I was the very worst."#, + BODY, + ), + ( + r#"So I said yes to Thomas Clinton and later thought + that I had said yes to God and later still realized I + had said yes only to Thomas Clinton."#, + BODY, + ), + ( + r#"Even if they are djinns, I will get djinns that + can outdjinn them."#, + BODY, + ), + ( + r#"Hatred was spreading everywhere, blood was being + spilled everywhere, wars were breaking out + everywhere."#, + BODY, + ), + ( + r#"Almost nothing was more annoying than having + our wasted time wasted on something not worth + wasting it on."#, + BODY, + ), + ( + r#"The depressed person was in terrible and unceasing + emotional pain, and the impossibility of sharing or + articulating this pain was itself a component of the + pain and a contributing factor in its essential horror."#, + BODY, + ), + ( + r#"Paranoids are not paranoid because they’re paranoid, + but because they keep putting themselves, darn idiots, + deliberately into paranoid situations."#, + BODY, + ), + ( + r#"Because the world is a place of silence, the sky at + night when the birds have gone is a vast silent place."#, + BODY, + ), + ( + r#"There are some things that are so unforgivable that + they make other things easily forgivable."#, + BODY, + ), + ( + r#"I had known loneliness before, and emptiness upon the + moor, but I had never been a NOTHING, a nothing floating + on a nothing, known by nothing, lonelier and colder than + the space between the stars."#, + ATTACHMENT, + ), + ( + r#"You’re an insomniac, you tell yourself: there are + profound truths revealed only to the insomniac by night + like those phosphorescent minerals veined and glimmering + in the dark but coarse and ordinary otherwise; you have + to examine such minerals in the absence of light to + discover their beauty, you tell yourself."#, + ATTACHMENT, + ), + ( + r#"Every person had a star, every star had a friend, + and for every person carrying a star there was someone + else who reflected it, and everyone carried this reflection + like a secret confidante in the heart."#, + ATTACHMENT, + ), + ( + r#"As my grandfather went, arm over arm, his heart making + sour little shudders against his ribs, he kept listening + for a sound, the sound of the tiger, the sound of anything + but his own feet and lungs."#, + ATTACHMENT, + ), + (r#"love loving lovingly loved lovely"#, ATTACHMENT), + ]; + + let mut builder = TermIndexBuilder::new(); + let mut stemmed_word_ids = AHashMap::default(); + + // Build the term index + for (part_id, (text, field_id)) in parts.iter().enumerate() { + let mut terms = Vec::new(); + for token in Stemmer::new(text, Language::English, 40) { + let stemmed_word = if token.stemmed_word.is_some() { + token.stemmed_word.clone() + } else { + None + }; + let term = builder.add_stemmed_token(token); + if let Some(stemmed_word) = stemmed_word { + stemmed_word_ids.insert(term.id_stemmed, stemmed_word.into_owned()); + } + terms.push(term); + } + builder.add_terms(*field_id, part_id as u32, terms); + } + + let compressed_term_index = builder.serialize(); + let term_index = TermIndex::deserialize(&compressed_term_index[..]).unwrap(); + + assert_eq!( + 15, + TokenIndex::deserialize(&compressed_term_index[..]) + .unwrap() + .terms + .len() + ); + + for (words, field_id, match_phrase, match_count) in [ + (vec!["thomas", "clinton"], None, true, 4), + (vec!["was", "the", "worse"], None, true, 3), + (vec!["carri"], None, false, 2), + (vec!["nothing", "floating"], None, true, 2), + (vec!["floating", "nothing"], None, false, 6), + (vec!["floating", "nothing"], None, true, 0), + (vec!["noth", "floating"], None, true, 0), + (vec!["noth", "floating"], None, false, 6), + (vec!["realli", "happi"], None, false, 5), + (vec!["really", "happy"], None, true, 2), + (vec!["should", "feel", "happy", "but"], None, true, 4), + ( + vec!["love", "loving", "lovingly", "loved", "lovely"], + Some(ATTACHMENT), + true, + 5, + ), + (vec!["love"], Some(ATTACHMENT), false, 5), + (vec!["but"], None, false, 6), + (vec!["but"], None, true, 6), + ] { + let mut match_terms = Vec::new(); + for word in &words { + let stemmed_token = Stemmer::new(word, Language::English, 40) + .next() + .and_then(|w| w.stemmed_word); + match_terms.push( + term_index.get_match_term(word, stemmed_token.as_ref().map(|w| w.as_ref())), + ); + } + + let result = term_index + .match_terms( + &match_terms, + field_id.and_then(|f| { + let mut h = AHashSet::default(); + h.insert(f); + Some(h) + }), + match_phrase, + true, + true, + ) + .unwrap() + .unwrap_or_default(); + + let mut result_len = 0; + for r in &result { + result_len += r.terms.len(); + } + + if result_len != match_count { + for term_group in result { + let part = &parts[term_group.part_id as usize].0; + println!("-> Part id {}", term_group.part_id); + for term in term_group.terms { + println!( + "[{}] ", + &part[term.offset as usize..term.offset as usize + term.len as usize] + ); + } + } + panic!( + "Expected {}, got {} for words {:?}, match phrase {:?}.", + match_count, result_len, words, match_phrase + ); + } + + for term_group in &result { + 'outer: for term in &term_group.terms { + let text_word = parts[term_group.part_id as usize].0 + [term.offset as usize..term.offset as usize + term.len as usize] + .to_lowercase(); + let token_stemmed_word = if term.id_stemmed != term.id { + stemmed_word_ids.get(&term.id_stemmed) + } else { + None + }; + + for word in words.iter() { + if word == &text_word + || !match_phrase + && word == token_stemmed_word.unwrap_or(&"".to_string()) + { + continue 'outer; + } + } + panic!("({:?}, {}) != {:?}", words, match_phrase, result); + } + } + } + } +} diff --git a/src/fts/tokenizers/chinese.rs b/src/fts/tokenizers/chinese.rs new file mode 100644 index 00000000..5ebbe6b7 --- /dev/null +++ b/src/fts/tokenizers/chinese.rs @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{borrow::Cow, vec::IntoIter}; + +use jieba_rs::Jieba; + +use super::{word::WordTokenizer, Token}; +use lazy_static::lazy_static; + +lazy_static! { + static ref JIEBA: Jieba = Jieba::new(); +} + +pub struct ChineseTokenizer<'x> { + word_tokenizer: WordTokenizer<'x>, + tokens: IntoIter<&'x str>, + token_offset: usize, + token_len: usize, + token_len_cur: usize, + max_token_length: usize, +} + +impl<'x> ChineseTokenizer<'x> { + pub fn new(text: &str, max_token_length: usize) -> ChineseTokenizer { + ChineseTokenizer { + word_tokenizer: WordTokenizer::new(text), + tokens: Vec::new().into_iter(), + max_token_length, + token_offset: 0, + token_len: 0, + token_len_cur: 0, + } + } +} + +impl<'x> Iterator for ChineseTokenizer<'x> { + type Item = Token<'x>; + + fn next(&mut self) -> Option { + loop { + if let Some(ch_token) = self.tokens.next() { + let offset_start = self.token_offset + self.token_len_cur; + self.token_len_cur += ch_token.len(); + + if ch_token.len() <= self.max_token_length { + return Token::new(offset_start, ch_token.len(), ch_token.into()).into(); + } + } else { + loop { + let (token, is_ascii) = self.word_tokenizer.next()?; + if !is_ascii { + let word = match token.word { + Cow::Borrowed(word) => word, + Cow::Owned(_) => unreachable!(), + }; + self.tokens = JIEBA.cut(word, false).into_iter(); + self.token_offset = token.offset as usize; + self.token_len = token.len as usize; + self.token_len_cur = 0; + break; + } else if token.len as usize <= self.max_token_length { + return token.into(); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn chinese_tokenizer() { + assert_eq!( + ChineseTokenizer::new( + "孫子曰:兵者,國之大事,死生之地,存亡之道,不可不察也。", + 40 + ) + .collect::>(), + vec![ + Token { + word: "孫".into(), + offset: 0, + len: 3 + }, + Token { + word: "子".into(), + offset: 3, + len: 3 + }, + Token { + word: "曰".into(), + offset: 6, + len: 3 + }, + Token { + word: "兵".into(), + offset: 12, + len: 3 + }, + Token { + word: "者".into(), + offset: 15, + len: 3 + }, + Token { + word: "國".into(), + offset: 21, + len: 3 + }, + Token { + word: "之".into(), + offset: 24, + len: 3 + }, + Token { + word: "大事".into(), + offset: 27, + len: 6 + }, + Token { + word: "死".into(), + offset: 36, + len: 3 + }, + Token { + word: "生".into(), + offset: 39, + len: 3 + }, + Token { + word: "之".into(), + offset: 42, + len: 3 + }, + Token { + word: "地".into(), + offset: 45, + len: 3 + }, + Token { + word: "存亡".into(), + offset: 51, + len: 6 + }, + Token { + word: "之".into(), + offset: 57, + len: 3 + }, + Token { + word: "道".into(), + offset: 60, + len: 3 + }, + Token { + word: "不可不".into(), + offset: 66, + len: 9 + }, + Token { + word: "察".into(), + offset: 75, + len: 3 + }, + Token { + word: "也".into(), + offset: 78, + len: 3 + } + ] + ); + } +} diff --git a/src/fts/tokenizers/indo_european.rs b/src/fts/tokenizers/indo_european.rs new file mode 100644 index 00000000..77f476c4 --- /dev/null +++ b/src/fts/tokenizers/indo_european.rs @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::str::CharIndices; + +use super::Token; + +pub struct IndoEuropeanTokenizer<'x> { + max_token_length: usize, + text: &'x str, + iterator: CharIndices<'x>, +} + +impl<'x> IndoEuropeanTokenizer<'x> { + pub fn new(text: &str, max_token_length: usize) -> IndoEuropeanTokenizer { + IndoEuropeanTokenizer { + max_token_length, + text, + iterator: text.char_indices(), + } + } +} + +/// Parses indo-european text into lowercase tokens. +impl<'x> Iterator for IndoEuropeanTokenizer<'x> { + type Item = Token<'x>; + + fn next(&mut self) -> Option { + while let Some((token_start, ch)) = self.iterator.next() { + if ch.is_alphanumeric() { + let mut is_uppercase = ch.is_uppercase(); + let token_end = (&mut self.iterator) + .filter_map(|(pos, ch)| { + if ch.is_alphanumeric() { + if !is_uppercase && ch.is_uppercase() { + is_uppercase = true; + } + None + } else { + pos.into() + } + }) + .next() + .unwrap_or(self.text.len()); + + let token_len = token_end - token_start; + if token_end > token_start && token_len <= self.max_token_length { + return Token::new( + token_start, + token_len, + if is_uppercase { + self.text[token_start..token_end].to_lowercase().into() + } else { + self.text[token_start..token_end].into() + }, + ) + .into(); + } + } + } + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn indo_european_tokenizer() { + let inputs = [ + ( + "The quick brown fox jumps over the lazy dog", + vec![ + Token::new(0, 3, "the".into()), + Token::new(4, 5, "quick".into()), + Token::new(10, 5, "brown".into()), + Token::new(16, 3, "fox".into()), + Token::new(20, 5, "jumps".into()), + Token::new(26, 4, "over".into()), + Token::new(31, 3, "the".into()), + Token::new(35, 4, "lazy".into()), + Token::new(40, 3, "dog".into()), + ], + ), + ( + "Jovencillo EMPONZOÑADO de whisky: ¡qué figurota exhibe!", + vec![ + Token::new(0, 10, "jovencillo".into()), + Token::new(11, 12, "emponzoñado".into()), + Token::new(24, 2, "de".into()), + Token::new(27, 6, "whisky".into()), + Token::new(37, 4, "qué".into()), + Token::new(42, 8, "figurota".into()), + Token::new(51, 6, "exhibe".into()), + ], + ), + ( + "ZWÖLF Boxkämpfer jagten Victor quer über den großen Sylter Deich", + vec![ + Token::new(0, 6, "zwölf".into()), + Token::new(7, 11, "boxkämpfer".into()), + Token::new(19, 6, "jagten".into()), + Token::new(26, 6, "victor".into()), + Token::new(33, 4, "quer".into()), + Token::new(38, 5, "über".into()), + Token::new(44, 3, "den".into()), + Token::new(48, 7, "großen".into()), + Token::new(56, 6, "sylter".into()), + Token::new(63, 5, "deich".into()), + ], + ), + ( + "Съешь ещё этих мягких французских булок, да выпей же чаю", + vec![ + Token::new(0, 10, "съешь".into()), + Token::new(11, 6, "ещё".into()), + Token::new(18, 8, "этих".into()), + Token::new(27, 12, "мягких".into()), + Token::new(40, 22, "французских".into()), + Token::new(63, 10, "булок".into()), + Token::new(75, 4, "да".into()), + Token::new(80, 10, "выпей".into()), + Token::new(91, 4, "же".into()), + Token::new(96, 6, "чаю".into()), + ], + ), + ( + "Pijamalı hasta yağız şoföre çabucak güvendi", + vec![ + Token::new(0, 9, "pijamalı".into()), + Token::new(10, 5, "hasta".into()), + Token::new(16, 7, "yağız".into()), + Token::new(24, 8, "şoföre".into()), + Token::new(33, 8, "çabucak".into()), + Token::new(42, 8, "güvendi".into()), + ], + ), + ]; + + for (input, tokens) in inputs.iter() { + for (pos, token) in IndoEuropeanTokenizer::new(input, 40).enumerate() { + assert_eq!(token, tokens[pos]); + } + } + } +} diff --git a/src/fts/tokenizers/japanese.rs b/src/fts/tokenizers/japanese.rs new file mode 100644 index 00000000..608b6bb6 --- /dev/null +++ b/src/fts/tokenizers/japanese.rs @@ -0,0 +1,168 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::vec::IntoIter; + +use super::{word::WordTokenizer, Token}; + +pub struct JapaneseTokenizer<'x> { + word_tokenizer: WordTokenizer<'x>, + tokens: IntoIter, + token_offset: usize, + token_len: usize, + token_len_cur: usize, + max_token_length: usize, +} + +impl<'x> JapaneseTokenizer<'x> { + pub fn new(text: &str, max_token_length: usize) -> JapaneseTokenizer { + JapaneseTokenizer { + word_tokenizer: WordTokenizer::new(text), + tokens: Vec::new().into_iter(), + max_token_length, + token_offset: 0, + token_len: 0, + token_len_cur: 0, + } + } +} + +impl<'x> Iterator for JapaneseTokenizer<'x> { + type Item = Token<'x>; + + fn next(&mut self) -> Option { + loop { + if let Some(jp_token) = self.tokens.next() { + let offset_start = self.token_offset + self.token_len_cur; + self.token_len_cur += jp_token.len(); + + if jp_token.len() <= self.max_token_length { + return Token::new(offset_start, jp_token.len(), jp_token.into()).into(); + } + } else { + loop { + let (token, is_ascii) = self.word_tokenizer.next()?; + if !is_ascii { + self.tokens = tinysegmenter::tokenize(token.word.as_ref()).into_iter(); + self.token_offset = token.offset as usize; + self.token_len = token.len as usize; + self.token_len_cur = 0; + break; + } else if token.len as usize <= self.max_token_length { + return token.into(); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn japanese_tokenizer() { + assert_eq!( + JapaneseTokenizer::new("お先に失礼します あなたの名前は何ですか 123 abc-872", 40) + .collect::>(), + vec![ + Token { + word: "お先".into(), + offset: 0, + len: 6 + }, + Token { + word: "に".into(), + offset: 6, + len: 3 + }, + Token { + word: "失礼".into(), + offset: 9, + len: 6 + }, + Token { + word: "し".into(), + offset: 15, + len: 3 + }, + Token { + word: "ます".into(), + offset: 18, + len: 6 + }, + Token { + word: "あなた".into(), + offset: 25, + len: 9 + }, + Token { + word: "の".into(), + offset: 34, + len: 3 + }, + Token { + word: "名前".into(), + offset: 37, + len: 6 + }, + Token { + word: "は".into(), + offset: 43, + len: 3 + }, + Token { + word: "何".into(), + offset: 46, + len: 3 + }, + Token { + word: "です".into(), + offset: 49, + len: 6 + }, + Token { + word: "か".into(), + offset: 55, + len: 3 + }, + Token { + word: "123".into(), + offset: 59, + len: 3 + }, + Token { + word: "abc".into(), + offset: 63, + len: 3 + }, + Token { + word: "872".into(), + offset: 67, + len: 3 + } + ] + ); + } +} diff --git a/src/fts/tokenizers/mod.rs b/src/fts/tokenizers/mod.rs new file mode 100644 index 00000000..32ddd982 --- /dev/null +++ b/src/fts/tokenizers/mod.rs @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +pub mod chinese; +pub mod indo_european; +pub mod japanese; +pub mod word; + +use std::borrow::Cow; + +use self::{ + chinese::ChineseTokenizer, indo_european::IndoEuropeanTokenizer, japanese::JapaneseTokenizer, +}; + +use super::Language; + +#[derive(Debug, PartialEq, Eq)] +pub struct Token<'x> { + pub word: Cow<'x, str>, + pub offset: u32, // Word offset in the text part + pub len: u8, // Word length +} + +impl<'x> Token<'x> { + pub fn new(offset: usize, len: usize, word: Cow<'x, str>) -> Token<'x> { + debug_assert!(offset <= u32::max_value() as usize); + debug_assert!(len <= u8::max_value() as usize); + Token { + offset: offset as u32, + len: len as u8, + word, + } + } +} + +enum LanguageTokenizer<'x> { + IndoEuropean(IndoEuropeanTokenizer<'x>), + Japanese(JapaneseTokenizer<'x>), + Chinese(ChineseTokenizer<'x>), +} + +pub struct Tokenizer<'x> { + tokenizer: LanguageTokenizer<'x>, +} + +impl<'x> Tokenizer<'x> { + pub fn new(text: &'x str, language: Language, max_token_length: usize) -> Self { + Tokenizer { + tokenizer: match language { + Language::Japanese => { + LanguageTokenizer::Japanese(JapaneseTokenizer::new(text, max_token_length)) + } + Language::Mandarin => { + LanguageTokenizer::Chinese(ChineseTokenizer::new(text, max_token_length)) + } + _ => LanguageTokenizer::IndoEuropean(IndoEuropeanTokenizer::new( + text, + max_token_length, + )), + }, + } + } +} + +impl<'x> Iterator for Tokenizer<'x> { + type Item = Token<'x>; + + fn next(&mut self) -> Option { + match &mut self.tokenizer { + LanguageTokenizer::IndoEuropean(tokenizer) => tokenizer.next(), + LanguageTokenizer::Chinese(tokenizer) => tokenizer.next(), + LanguageTokenizer::Japanese(tokenizer) => tokenizer.next(), + } + } +} diff --git a/src/fts/tokenizers/word.rs b/src/fts/tokenizers/word.rs new file mode 100644 index 00000000..385f3975 --- /dev/null +++ b/src/fts/tokenizers/word.rs @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::str::CharIndices; + +use super::Token; + +pub struct WordTokenizer<'x> { + text: &'x str, + iterator: CharIndices<'x>, +} + +impl<'x> WordTokenizer<'x> { + pub fn new(text: &str) -> WordTokenizer { + WordTokenizer { + text, + iterator: text.char_indices(), + } + } +} + +/// Parses text into tokens, used by non-IndoEuropean tokenizers. +impl<'x> Iterator for WordTokenizer<'x> { + type Item = (Token<'x>, bool); + + fn next(&mut self) -> Option { + let mut is_ascii = true; + while let Some((token_start, ch)) = self.iterator.next() { + if ch.is_alphanumeric() { + let token_end = (&mut self.iterator) + .filter_map(|(pos, ch)| { + if ch.is_alphanumeric() { + if is_ascii && !ch.is_ascii() { + is_ascii = false; + } + None + } else { + pos.into() + } + }) + .next() + .unwrap_or(self.text.len()); + + let token_len = token_end - token_start; + if token_end > token_start { + return ( + Token::new( + token_start, + token_len, + self.text[token_start..token_end].into(), + ), + is_ascii, + ) + .into(); + } + } + } + None + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..1c4797aa --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,91 @@ +use rocksdb::{MultiThreaded, OptimisticTransactionDB}; + +pub mod backend; +pub mod fts; +pub mod query; +pub mod write; + +pub struct Store { + db: OptimisticTransactionDB, +} + +pub trait Deserialize: Sized + Sync + Send { + fn deserialize(bytes: &[u8]) -> Option; +} + +pub trait Serialize { + fn serialize(self) -> Vec; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct BitmapKey<'x> { + pub account_id: u32, + pub collection: u8, + pub family: u8, + pub field: u8, + pub key: &'x [u8], +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct IndexKey<'x> { + pub account_id: u32, + pub collection: u8, + pub field: u8, + pub key: &'x [u8], +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub struct ValueKey { + pub account_id: u32, + pub collection: u8, + pub document_id: u32, + pub field: u8, +} + +pub type Result = std::result::Result; + +pub enum Error { + NotFound, + InternalError(String), +} + +pub const BM_DOCUMENT_IDS: u8 = 0; +pub const BM_TERM: u8 = 0x10; +pub const BM_TAG: u8 = 0x20; + +pub const TERM_EXACT: u8 = 0x00; +pub const TERM_STEMMED: u8 = 0x01; +pub const TERM_STRING: u8 = 0x02; +pub const TERM_HASH: u8 = 0x04; + +pub const TAG_ID: u8 = 0x00; +pub const TAG_TEXT: u8 = 0x01; +pub const TAG_STATIC: u8 = 0x02; + +#[cfg(test)] +mod tests { + use rand::Rng; + use roaring::RoaringBitmap; + + use super::*; + + #[test] + fn it_works() { + let mut rb1 = RoaringBitmap::new(); + let mut rb2 = RoaringBitmap::new(); + let total = rand::thread_rng().gen_range(0..100000); + println!("total: {}", total); + + for num in 0..total { + rb1.insert(rand::thread_rng().gen_range(0..u32::MAX)); + rb2.insert(num); + } + + println!("sparse: {}", rb1.serialized_size()); + println!("compact: {}", rb2.serialized_size()); + println!( + "ratio: {}", + rb1.serialized_size() as f64 / rb2.serialized_size() as f64 + ); + } +} diff --git a/src/query/filter.rs b/src/query/filter.rs new file mode 100644 index 00000000..65fa41ad --- /dev/null +++ b/src/query/filter.rs @@ -0,0 +1,299 @@ +use std::{ + borrow::Cow, + ops::{BitAndAssign, BitOrAssign, BitXorAssign}, +}; + +use ahash::AHashSet; +use roaring::RoaringBitmap; + +use crate::{ + fts::{ + index::MAX_TOKEN_LENGTH, stemmer::Stemmer, term_index::TermIndex, tokenizers::Tokenizer, + }, + write::Tokenize, + BitmapKey, Error, IndexKey, Store, ValueKey, BM_TERM, TERM_EXACT, TERM_STEMMED, +}; + +use super::{Filter, ResultSet}; + +struct State { + op: Filter, + bm: Option, +} + +impl Store { + pub fn filter( + &self, + account_id: u32, + collection: u8, + filters: Vec, + ) -> crate::Result { + let document_ids = self + .get_document_ids(account_id, collection)? + .unwrap_or_else(RoaringBitmap::new); + let mut state: State = Filter::And.into(); + let mut stack = Vec::new(); + let mut filters = filters.into_iter().peekable(); + + while let Some(filter) = filters.next() { + match filter { + Filter::HasKeyword { field, value } => { + state.op.apply( + &mut state.bm, + self.get_bitmap(BitmapKey { + account_id, + collection, + family: BM_TERM | TERM_EXACT, + field, + key: value.as_bytes(), + })?, + &document_ids, + ); + } + Filter::HasKeywords { field, value } => { + let tokens = value.tokenize(); + state.op.apply( + &mut state.bm, + self.get_bitmaps_intersection( + tokens + .iter() + .map(|key| BitmapKey { + account_id, + collection, + family: BM_TERM | TERM_EXACT, + field, + key, + }) + .collect(), + )?, + &document_ids, + ); + } + Filter::MatchValue { field, op, value } => { + state.op.apply( + &mut state.bm, + self.range_to_bitmap( + IndexKey { + account_id, + collection, + field, + key: &value, + }, + op, + )?, + &document_ids, + ); + } + Filter::HasText { + field, + text, + language, + match_phrase, + } => { + if match_phrase { + let phrase = Tokenizer::new(&text, language, MAX_TOKEN_LENGTH) + .map(|token| token.word) + .collect::>(); + let mut keys = Vec::with_capacity(phrase.len()); + + for word in &phrase { + let key = BitmapKey { + account_id, + collection, + family: BM_TERM | TERM_EXACT, + field, + key: word.as_bytes(), + }; + if !keys.contains(&key) { + keys.push(key); + } + } + + // Retrieve the Term Index for each candidate and match the exact phrase + if let Some(candidates) = self.get_bitmaps_intersection(keys)? { + let mut results = RoaringBitmap::new(); + for document_id in candidates.iter() { + if let Some(term_index) = self.get_value::(ValueKey { + account_id, + collection, + document_id, + field: u8::MAX, + })? { + if term_index + .match_terms( + &phrase + .iter() + .map(|w| term_index.get_match_term(w, None)) + .collect::>(), + None, + true, + false, + false, + ) + .map_err(|e| { + Error::InternalError(format!( + "Corrupted TermIndex for {}: {:?}", + document_id, e + )) + })? + .is_some() + { + results.insert(document_id); + } + } + } + state.op.apply(&mut state.bm, results.into(), &document_ids); + } else { + state.op.apply(&mut state.bm, None, &document_ids); + } + } else { + let words = Stemmer::new(&text, language, MAX_TOKEN_LENGTH) + .map(|token| (token.word, token.stemmed_word.unwrap_or(Cow::from("")))) + .collect::>(); + let mut requested_keys = AHashSet::default(); + let mut text_bitmap = None; + + for (word, stemmed_word) in &words { + let mut keys = Vec::new(); + + for (word, family) in [ + (word, BM_TERM | TERM_EXACT), + (word, BM_TERM | TERM_STEMMED), + (stemmed_word, BM_TERM | TERM_EXACT), + (stemmed_word, BM_TERM | TERM_STEMMED), + ] { + if !word.is_empty() { + let key = BitmapKey { + account_id, + collection, + family, + field, + key: word.as_bytes(), + }; + if !requested_keys.contains(&key) { + requested_keys.insert(key); + keys.push(key); + } + } + } + + // Term already matched on a previous iteration + if keys.is_empty() { + continue; + } + + Filter::And.apply( + &mut text_bitmap, + self.get_bitmaps_union(keys)?, + &document_ids, + ); + + if text_bitmap.as_ref().unwrap().is_empty() { + break; + } + } + state.op.apply(&mut state.bm, text_bitmap, &document_ids); + } + } + Filter::InBitmap { family, field, key } => { + state.op.apply( + &mut state.bm, + self.get_bitmap(BitmapKey { + account_id, + collection, + family, + field, + key: &key, + })?, + &document_ids, + ); + } + Filter::DocumentSet(set) => { + state.op.apply(&mut state.bm, Some(set), &document_ids); + } + op @ (Filter::And | Filter::Or | Filter::Not) => { + stack.push(state); + state = op.into(); + continue; + } + Filter::End => { + if let Some(mut prev_state) = stack.pop() { + prev_state + .op + .apply(&mut prev_state.bm, state.bm, &document_ids); + state = prev_state; + } else { + break; + } + } + } + + if matches!(state.op, Filter::And) && state.bm.as_ref().unwrap().is_empty() { + while let Some(filter) = filters.peek() { + if matches!(filter, Filter::End) { + break; + } else { + filters.next(); + } + } + } + } + + Ok(ResultSet { + results: state.bm.unwrap_or_else(RoaringBitmap::new), + document_ids, + }) + } +} + +impl Filter { + #[inline(always)] + pub fn apply( + &self, + dest: &mut Option, + mut src: Option, + not_mask: &RoaringBitmap, + ) { + if let Some(dest) = dest { + match self { + Filter::And => { + if let Some(src) = src { + dest.bitand_assign(src); + } else { + dest.clear(); + } + } + Filter::Or => { + if let Some(src) = src { + dest.bitor_assign(src); + } + } + Filter::Not => { + if let Some(mut src) = src { + src.bitxor_assign(not_mask); + dest.bitand_assign(src); + } + } + _ => unreachable!(), + } + } else if let Some(ref mut src_) = src { + if let Filter::Not = self { + src_.bitxor_assign(not_mask); + } + *dest = src; + } else if let Filter::Not = self { + *dest = Some(not_mask.clone()); + } else { + *dest = Some(RoaringBitmap::new()); + } + } +} + +impl From for State { + fn from(value: Filter) -> Self { + Self { + op: value, + bm: None, + } + } +} diff --git a/src/query/mod.rs b/src/query/mod.rs new file mode 100644 index 00000000..576cb3b0 --- /dev/null +++ b/src/query/mod.rs @@ -0,0 +1,160 @@ +pub mod filter; +pub mod sort; + +use roaring::RoaringBitmap; + +use crate::{ + fts::{lang::LanguageDetector, Language}, + Serialize, +}; + +#[derive(Debug, Clone, Copy)] +pub enum Operator { + LowerThan, + LowerEqualThan, + GreaterThan, + GreaterEqualThan, + Equal, +} + +#[derive(Debug)] +pub enum Filter { + HasKeyword { + field: u8, + value: String, + }, + HasKeywords { + field: u8, + value: String, + }, + MatchValue { + field: u8, + op: Operator, + value: Vec, + }, + HasText { + field: u8, + text: String, + language: Language, + match_phrase: bool, + }, + InBitmap { + family: u8, + field: u8, + key: Vec, + }, + DocumentSet(RoaringBitmap), + And, + Or, + Not, + End, +} + +#[derive(Debug)] +pub enum Comparator { + Field { field: u8, ascending: bool }, + DocumentSet { set: RoaringBitmap, ascending: bool }, +} + +pub struct ResultSet { + results: RoaringBitmap, + document_ids: RoaringBitmap, +} + +pub struct SortedResultRet { + pub position: i32, + pub ids: Vec, + pub found_anchor: bool, +} + +impl Filter { + pub fn new_condition(field: impl Into, op: Operator, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op, + value: value.serialize(), + } + } + + pub fn eq(field: impl Into, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op: Operator::Equal, + value: value.serialize(), + } + } + + pub fn lt(field: impl Into, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op: Operator::LowerThan, + value: value.serialize(), + } + } + + pub fn le(field: impl Into, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op: Operator::LowerEqualThan, + value: value.serialize(), + } + } + + pub fn gt(field: impl Into, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op: Operator::GreaterThan, + value: value.serialize(), + } + } + + pub fn ge(field: impl Into, value: impl Serialize) -> Self { + Filter::MatchValue { + field: field.into(), + op: Operator::GreaterEqualThan, + value: value.serialize(), + } + } + + pub fn match_text(field: impl Into, mut text: String, mut language: Language) -> Self { + let match_phrase = (text.starts_with('"') && text.ends_with('"')) + || (text.starts_with('\'') && text.ends_with('\'')); + + if !match_phrase && language == Language::Unknown { + language = if let Some((l, t)) = text + .split_once(':') + .and_then(|(l, t)| (Language::from_iso_639(l)?, t.to_string()).into()) + { + text = t; + l + } else { + LanguageDetector::detect_single(&text) + .and_then(|(l, c)| if c > 0.3 { Some(l) } else { None }) + .unwrap_or(Language::Unknown) + }; + } + + Filter::HasText { + field: field.into(), + text, + language, + match_phrase, + } + } +} + +impl Comparator { + pub fn ascending(field: impl Into) -> Self { + Self::Field { + field: field.into(), + ascending: true, + } + } + + pub fn descending(field: impl Into) -> Self { + Self::Field { + field: field.into(), + ascending: false, + } + } +} diff --git a/src/query/sort.rs b/src/query/sort.rs new file mode 100644 index 00000000..823c11b2 --- /dev/null +++ b/src/query/sort.rs @@ -0,0 +1,424 @@ +use std::ops::{BitAndAssign, BitXorAssign}; + +use roaring::RoaringBitmap; +use rocksdb::{ + DBIteratorWithThreadMode, Direction, IteratorMode, MultiThreaded, OptimisticTransactionDB, +}; + +use crate::{ + backend::rocksdb::{ACCOUNT_KEY_LEN, CF_INDEXES}, + write::key::KeySerializer, + Error, Store, +}; + +use super::{Comparator, ResultSet, SortedResultRet}; + +enum IndexType<'x> { + DocumentSet { + set: RoaringBitmap, + it: Option, + }, + DB { + it: Option>>, + prefix: Vec, + start_key: Vec, + ascending: bool, + prev_item: Option, + prev_key: Option>, + }, +} + +struct IndexIterator<'x> { + index: IndexType<'x>, + remaining: RoaringBitmap, + eof: bool, +} + +impl Store { + #[allow(clippy::too_many_arguments)] + pub fn sort( + &self, + account_id: u32, + collection: u8, + mut result_set: ResultSet, + comparators: Vec, + limit: usize, + mut position: i32, + anchor: Option, + mut anchor_offset: i32, + ) -> crate::Result { + let has_anchor = anchor.is_some(); + let mut anchor_found = false; + let requested_position = position; + + let mut result = SortedResultRet { + position, + ids: Vec::with_capacity(std::cmp::min(limit, result_set.results.len() as usize)), + found_anchor: true, + }; + let mut iterators = comparators + .into_iter() + .map(|comp| IndexIterator { + index: match comp { + Comparator::Field { field, ascending } => { + let prefix = KeySerializer::new(ACCOUNT_KEY_LEN) + .write(account_id) + .write(collection) + .write(field) + .finalize(); + IndexType::DB { + it: None, + start_key: if !ascending { + let (key_account_id, key_collection, key_field) = if field < u8::MAX + { + (account_id, collection, field + 1) + } else if (collection) < u8::MAX { + (account_id, (collection) + 1, field) + } else { + (account_id + 1, collection, field) + }; + KeySerializer::new(ACCOUNT_KEY_LEN) + .write(key_account_id) + .write(key_collection) + .write(key_field) + .finalize() + } else { + prefix.clone() + }, + prefix, + ascending, + prev_item: None, + prev_key: None, + } + } + Comparator::DocumentSet { mut set, ascending } => IndexType::DocumentSet { + set: if !ascending { + if !set.is_empty() { + set.bitxor_assign(&result_set.document_ids); + set + } else { + result_set.document_ids.clone() + } + } else { + set + }, + it: None, + }, + }, + remaining: std::mem::replace(&mut result_set.results, RoaringBitmap::new()), + eof: false, + }) + .collect::>(); + + let mut current = 0; + + 'outer: loop { + let mut doc_id; + + 'inner: loop { + let (it_opts, mut next_it_opts) = if current < iterators.len() - 1 { + let (iterators_first, iterators_last) = iterators.split_at_mut(current + 1); + ( + iterators_first.last_mut().unwrap(), + iterators_last.first_mut(), + ) + } else { + (&mut iterators[current], None) + }; + + if !matches!(it_opts.index, IndexType::DB { prev_item,.. } if prev_item.is_some()) + { + if it_opts.remaining.is_empty() { + if current > 0 { + current -= 1; + continue 'inner; + } else { + break 'outer; + } + } else if it_opts.remaining.len() == 1 || it_opts.eof { + doc_id = it_opts.remaining.min().unwrap(); + it_opts.remaining.remove(doc_id); + break 'inner; + } + } + + match &mut it_opts.index { + IndexType::DB { + it, + prefix, + start_key, + ascending, + prev_item, + prev_key, + } => { + let it = if let Some(it) = it { + it + } else { + *it = Some(self.db.iterator_cf( + &self.db.cf_handle(CF_INDEXES).unwrap(), + IteratorMode::From( + start_key, + if *ascending { + Direction::Forward + } else { + Direction::Reverse + }, + ), + )); + it.as_mut().unwrap() + }; + + let mut prev_key_prefix = prev_key + .as_ref() + .and_then(|k| k.get(..k.len() - std::mem::size_of::())) + .unwrap_or_default(); + + if let Some(prev_item) = prev_item.take() { + if let Some(next_it_opts) = &mut next_it_opts { + next_it_opts.remaining.insert(prev_item); + } else { + doc_id = prev_item; + break 'inner; + } + } + + let mut is_eof = false; + loop { + if let Some(result) = it.next() { + let (key, _) = result.map_err(|e| { + Error::InternalError(format!("Iterator error: {}", e)) + })?; + if !key.starts_with(prefix) { + *prev_key = None; + is_eof = true; + break; + } + + doc_id = u32::from_be_bytes( + key.get(key.len() - std::mem::size_of::()..) + .ok_or_else(|| { + Error::InternalError("Invalid index entry".to_string()) + })? + .try_into() + .unwrap(), + ); + if it_opts.remaining.contains(doc_id) { + it_opts.remaining.remove(doc_id); + + if let Some(next_it_opts) = &mut next_it_opts { + if let Some(prev_key_) = &*prev_key { + if key.len() != prev_key_.len() + || !key.starts_with(prev_key_prefix) + { + *prev_item = Some(doc_id); + *prev_key = Some(key); + break; + } + } else { + *prev_key = Some(key); + prev_key_prefix = prev_key + .as_ref() + .and_then(|key| { + key.get( + ..key.len() - std::mem::size_of::(), + ) + }) + .ok_or_else(|| { + Error::InternalError( + "Invalid index entry".to_string(), + ) + })?; + } + + next_it_opts.remaining.insert(doc_id); + } else { + // doc id found + break 'inner; + } + } + } else { + is_eof = true; + break; + } + } + + if is_eof { + if let Some(next_it_opts) = &mut next_it_opts { + if !it_opts.remaining.is_empty() { + next_it_opts.remaining |= &it_opts.remaining; + it_opts.remaining.clear(); + } + *prev_key = None; + it_opts.eof = true; + } + } + } + IndexType::DocumentSet { set, it } => { + if let Some(it) = it { + if let Some(_doc_id) = it.next() { + doc_id = _doc_id; + break 'inner; + } + } else { + let mut set = set.clone(); + set.bitand_assign(&it_opts.remaining); + let set_len = set.len(); + if set_len > 0 { + it_opts.remaining.bitxor_assign(&set); + + match &mut next_it_opts { + Some(next_it_opts) if set_len > 1 => { + next_it_opts.remaining = set; + } + _ if set_len == 1 => { + doc_id = set.min().unwrap(); + break 'inner; + } + _ => { + let mut it_ = set.into_iter(); + let result = it_.next(); + *it = Some(it_); + if let Some(result) = result { + doc_id = result; + break 'inner; + } else { + break 'outer; + } + } + } + } else if !it_opts.remaining.is_empty() { + if let Some(ref mut next_it_opts) = next_it_opts { + next_it_opts.remaining = std::mem::take(&mut it_opts.remaining); + } + } + }; + } + }; + + if let Some(next_it_opts) = next_it_opts { + if !next_it_opts.remaining.is_empty() { + if next_it_opts.remaining.len() == 1 { + doc_id = next_it_opts.remaining.min().unwrap(); + next_it_opts.remaining.remove(doc_id); + break 'inner; + } else { + match &mut next_it_opts.index { + IndexType::DB { + it, + start_key, + ascending, + prev_item, + prev_key, + .. + } => { + if let Some(it) = it { + *it = self.db.iterator_cf( + &self.db.cf_handle(CF_INDEXES).unwrap(), + IteratorMode::From( + start_key, + if *ascending { + Direction::Forward + } else { + Direction::Reverse + }, + ), + ); + } + *prev_item = None; + *prev_key = None; + } + IndexType::DocumentSet { it, .. } => { + *it = None; + } + } + + current += 1; + next_it_opts.eof = false; + continue 'inner; + } + } + } + + it_opts.eof = true; + + if it_opts.remaining.is_empty() { + if current > 0 { + current -= 1; + } else { + break 'outer; + } + } + } + + // Pagination + if !has_anchor { + if position >= 0 { + if position > 0 { + position -= 1; + } else { + result.ids.push(doc_id); + if limit > 0 && result.ids.len() == limit { + break 'outer; + } + } + } else { + result.ids.push(doc_id); + } + } else if anchor_offset >= 0 { + if !anchor_found { + if &doc_id != anchor.as_ref().unwrap() { + continue 'outer; + } + anchor_found = true; + } + + if anchor_offset > 0 { + anchor_offset -= 1; + } else { + result.ids.push(doc_id); + if limit > 0 && result.ids.len() == limit { + break 'outer; + } + } + } else { + anchor_found = &doc_id == anchor.as_ref().unwrap(); + result.ids.push(doc_id); + + if !anchor_found { + continue 'outer; + } + + position = anchor_offset; + + break 'outer; + } + } + + if !has_anchor || anchor_found { + if !has_anchor && requested_position >= 0 { + result.position = if position == 0 { requested_position } else { 0 }; + } else if position >= 0 { + result.position = position; + } else { + let position = position.unsigned_abs() as usize; + let start_offset = if position < result.ids.len() { + result.ids.len() - position + } else { + 0 + }; + result.position = start_offset as i32; + let end_offset = if limit > 0 { + std::cmp::min(start_offset + limit, result.ids.len()) + } else { + result.ids.len() + }; + + result.ids = result.ids[start_offset..end_offset].to_vec() + } + } else { + result.found_anchor = false; + } + + Ok(result) + } +} diff --git a/src/rocksdb.rs b/src/rocksdb.rs new file mode 100644 index 00000000..9b7f3b42 --- /dev/null +++ b/src/rocksdb.rs @@ -0,0 +1,343 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart JMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{convert::TryInto, path::PathBuf, sync::Arc}; + +use rocksdb::{ + BoundColumnFamily, ColumnFamilyDescriptor, DBIteratorWithThreadMode, MergeOperands, + MultiThreaded, OptimisticTransactionDB, Options, +}; + +use crate::{Deserialize, Error, InnerStore}; + +pub struct RocksDB { + db: OptimisticTransactionDB, +} + +pub struct RocksDBIterator<'x> { + it: DBIteratorWithThreadMode<'x, OptimisticTransactionDB>, +} + +impl Iterator for RocksDBIterator<'_> { + type Item = (Box<[u8]>, Box<[u8]>); + + #[allow(clippy::while_let_on_iterator)] + #[inline(always)] + fn next(&mut self) -> Option { + while let Some(result) = self.it.next() { + if let Ok(item) = result { + return Some(item); + } + } + None + } +} + +impl InnerStore for RocksDB { + type Iterator<'x> = RocksDBIterator<'x>; + + #[inline(always)] + fn delete(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result<()> { + self.db + .delete_cf(&self.cf_handle(cf)?, key) + .map_err(|err| Error::InternalError(format!("delete_cf failed: {}", err))) + } + + #[inline(always)] + fn set(&self, cf: crate::ColumnFamily, key: &[u8], value: &[u8]) -> crate::Result<()> { + self.db + .put_cf(&self.cf_handle(cf)?, key, value) + .map_err(|err| Error::InternalError(format!("put_cf failed: {}", err))) + } + + #[inline(always)] + fn get(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result> + where + U: Deserialize, + { + if let Some(bytes) = self + .db + .get_pinned_cf(&self.cf_handle(cf)?, key) + .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? + { + Ok(Some(U::deserialize(&bytes).ok_or_else(|| { + Error::DeserializeError(format!("Failed to deserialize key: {:?}", key)) + })?)) + } else { + Ok(None) + } + } + + #[inline(always)] + fn merge(&self, cf: crate::ColumnFamily, key: &[u8], value: &[u8]) -> crate::Result<()> { + self.db + .merge_cf(&self.cf_handle(cf)?, key, value) + .map_err(|err| Error::InternalError(format!("merge_cf failed: {}", err))) + } + + /* + #[inline(always)] + fn write(&self, batch: Vec) -> crate::Result<()> { + let mut rocks_batch = rocksdb::WriteBatch::default(); + let cf_bitmaps = self.cf_handle(crate::ColumnFamily::Bitmaps)?; + let cf_values = self.cf_handle(crate::ColumnFamily::Values)?; + let cf_indexes = self.cf_handle(crate::ColumnFamily::Indexes)?; + let cf_blobs = self.cf_handle(crate::ColumnFamily::Blobs)?; + let cf_logs = self.cf_handle(crate::ColumnFamily::Logs)?; + + for op in batch { + match op { + WriteOperation::Set { cf, key, value } => { + rocks_batch.put_cf( + match cf { + crate::ColumnFamily::Bitmaps => &cf_bitmaps, + crate::ColumnFamily::Values => &cf_values, + crate::ColumnFamily::Indexes => &cf_indexes, + crate::ColumnFamily::Blobs => &cf_blobs, + crate::ColumnFamily::Logs => &cf_logs, + }, + key, + value, + ); + } + WriteOperation::Delete { cf, key } => { + rocks_batch.delete_cf( + match cf { + crate::ColumnFamily::Bitmaps => &cf_bitmaps, + crate::ColumnFamily::Values => &cf_values, + crate::ColumnFamily::Indexes => &cf_indexes, + crate::ColumnFamily::Blobs => &cf_blobs, + crate::ColumnFamily::Logs => &cf_logs, + }, + key, + ); + } + WriteOperation::Merge { cf, key, value } => { + rocks_batch.merge_cf( + match cf { + crate::ColumnFamily::Bitmaps => &cf_bitmaps, + crate::ColumnFamily::Values => &cf_values, + crate::ColumnFamily::Indexes => &cf_indexes, + crate::ColumnFamily::Blobs => &cf_blobs, + crate::ColumnFamily::Logs => &cf_logs, + }, + key, + value, + ); + } + } + } + self.db + .write(rocks_batch) + .map_err(|err| Error::InternalError(format!("batch write failed: {}", err))) + } + + */ + + #[inline(always)] + fn exists(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result { + Ok(self + .db + .get_pinned_cf(&self.cf_handle(cf)?, key) + .map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))? + .is_some()) + } + + #[inline(always)] + fn multi_get( + &self, + cf: crate::ColumnFamily, + keys: Vec, + ) -> crate::Result>> + where + T: Deserialize, + U: AsRef<[u8]>, + { + let cf_handle = self.cf_handle(cf)?; + let mut results = Vec::with_capacity(keys.len()); + for value in self + .db + .multi_get_cf(keys.iter().map(|key| (&cf_handle, key)).collect::>()) + { + results.push( + if let Some(bytes) = value + .map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))? + { + T::deserialize(&bytes) + .ok_or_else(|| { + Error::DeserializeError("Failed to deserialize keys.".to_string()) + })? + .into() + } else { + None + }, + ); + } + + Ok(results) + } + + #[inline(always)] + fn iterator<'x>( + &'x self, + cf: crate::ColumnFamily, + start: &[u8], + direction: crate::Direction, + ) -> crate::Result> { + Ok(RocksDBIterator { + it: self.db.iterator_cf( + &self.cf_handle(cf)?, + rocksdb::IteratorMode::From( + start, + match direction { + crate::Direction::Forward => rocksdb::Direction::Forward, + crate::Direction::Backward => rocksdb::Direction::Reverse, + }, + ), + ), + }) + } + + fn compact(&self, cf: crate::ColumnFamily) -> crate::Result<()> { + self.db + .compact_range_cf(&self.cf_handle(cf)?, None::<&[u8]>, None::<&[u8]>); + Ok(()) + } + + fn open() -> crate::Result { + // Create the database directory if it doesn't exist + let path = PathBuf::from( + "/tmp/rocksdb.test", /*&settings + .get("db-path") + .unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/ + ); + let mut idx_path = path; + idx_path.push("idx"); + std::fs::create_dir_all(&idx_path).map_err(|err| { + Error::InternalError(format!( + "Failed to create index directory {}: {:?}", + idx_path.display(), + err + )) + })?; + + // Bitmaps + let cf_bitmaps = { + let mut cf_opts = Options::default(); + //cf_opts.set_max_write_buffer_number(16); + //cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge); + //cf_opts.set_compaction_filter("compact", bitmap_compact); + ColumnFamilyDescriptor::new("bitmaps", cf_opts) + }; + + // Stored values + let cf_values = { + let mut cf_opts = Options::default(); + cf_opts.set_merge_operator_associative("merge", numeric_value_merge); + ColumnFamilyDescriptor::new("values", cf_opts) + }; + + // Secondary indexes + let cf_indexes = { + let cf_opts = Options::default(); + ColumnFamilyDescriptor::new("indexes", cf_opts) + }; + + // Blobs + let cf_blobs = { + let mut cf_opts = Options::default(); + cf_opts.set_enable_blob_files(true); + cf_opts.set_min_blob_size( + 16834, /*settings.parse("blob-min-size").unwrap_or(16384) */ + ); + ColumnFamilyDescriptor::new("blobs", cf_opts) + }; + + // Raft log and change log + let cf_log = { + let cf_opts = Options::default(); + ColumnFamilyDescriptor::new("logs", cf_opts) + }; + + let mut db_opts = Options::default(); + db_opts.create_missing_column_families(true); + db_opts.create_if_missing(true); + + Ok(RocksDB { + db: OptimisticTransactionDB::open_cf_descriptors( + &db_opts, + idx_path, + vec![cf_bitmaps, cf_values, cf_indexes, cf_blobs, cf_log], + ) + .map_err(|e| Error::InternalError(e.into_string()))?, + }) + } + + fn close(&self) -> crate::Result<()> { + self.db + .flush() + .map_err(|e| Error::InternalError(e.to_string()))?; + self.db.cancel_all_background_work(true); + Ok(()) + } +} + +impl RocksDB { + #[inline(always)] + fn cf_handle(&self, cf: crate::ColumnFamily) -> crate::Result> { + self.db + .cf_handle(match cf { + crate::ColumnFamily::Bitmaps => "bitmaps", + crate::ColumnFamily::Values => "values", + crate::ColumnFamily::Indexes => "indexes", + crate::ColumnFamily::Blobs => "blobs", + crate::ColumnFamily::Logs => "logs", + }) + .ok_or_else(|| { + Error::InternalError(format!( + "Failed to get handle for '{:?}' column family.", + cf + )) + }) + } +} + +pub fn numeric_value_merge( + _key: &[u8], + value: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + let mut value = if let Some(value) = value { + i64::from_le_bytes(value.try_into().ok()?) + } else { + 0 + }; + + for op in operands.iter() { + value += i64::from_le_bytes(op.try_into().ok()?); + } + + let mut bytes = Vec::with_capacity(std::mem::size_of::()); + bytes.extend_from_slice(&value.to_le_bytes()); + Some(bytes) +} diff --git a/src/write/batch.rs b/src/write/batch.rs new file mode 100644 index 00000000..d4f87409 --- /dev/null +++ b/src/write/batch.rs @@ -0,0 +1,117 @@ +use crate::{BM_TERM, TERM_EXACT}; + +use super::{ + Batch, BatchBuilder, HasFlag, IntoBitmap, IntoOperations, Operation, Serialize, Tokenize, + F_CLEAR, F_INDEX, F_TOKENIZE, F_VALUE, +}; + +impl BatchBuilder { + pub fn new() -> Self { + Self { + ops: Vec::new(), + last_account_id: 0, + last_document_id: 0, + last_collection: 0, + } + } + + pub fn with_context( + &mut self, + account_id: u32, + document_id: u32, + collection: impl Into, + ) -> &mut Self { + self.last_account_id = account_id; + self.last_document_id = document_id; + self.last_collection = collection.into(); + self.push_context(); + self + } + + #[inline(always)] + pub(super) fn push_context(&mut self) { + self.ops.push(Operation::WithContext { + account_id: self.last_account_id, + document_id: self.last_document_id, + collection: self.last_collection, + }); + } + + pub fn value( + &mut self, + field: impl Into, + value: impl Serialize + Tokenize, + options: u32, + ) -> &mut Self { + let field = field.into(); + let is_set = !options.has_flag(F_CLEAR); + + if options.has_flag(F_TOKENIZE) { + for token in value.tokenize() { + self.ops.push(Operation::Bitmap { + family: BM_TERM | TERM_EXACT, + field, + key: token, + set: is_set, + }); + } + } + + let value = value.serialize(); + + if options.has_flag(F_INDEX) { + self.ops.push(Operation::Index { + field, + key: value.clone(), + set: is_set, + }); + } + + if options.has_flag(F_VALUE) { + self.ops.push(Operation::Value { + field, + set: if is_set { Some(value) } else { None }, + }); + } + + self + } + + pub fn bitmap(&mut self, field: impl Into, value: impl IntoBitmap, options: u32) { + let (key, family) = value.into_bitmap(); + self.ops.push(Operation::Bitmap { + family, + field: field.into(), + key, + set: !options.has_flag(F_CLEAR), + }); + } + + pub fn acl(&mut self, to_account_id: u32, acl: Option) { + self.ops.push(Operation::Acl { + to_account_id, + set: acl.map(|acl| acl.serialize()), + }) + } + + pub fn blob(&mut self, blob_id: impl Serialize, options: u32) { + self.ops.push(Operation::Blob { + key: blob_id.serialize(), + set: !options.has_flag(F_CLEAR), + }); + } + + pub fn custom(&mut self, value: impl IntoOperations) -> crate::Result<()> { + value.build(self) + } + + pub fn build(self) -> Batch { + Batch { ops: self.ops } + } +} + +impl Default for BatchBuilder { + fn default() -> Self { + Self::new() + } +} diff --git a/src/write/key.rs b/src/write/key.rs new file mode 100644 index 00000000..078e23fe --- /dev/null +++ b/src/write/key.rs @@ -0,0 +1,93 @@ +use std::convert::TryInto; +use utils::codec::leb128::Leb128_; + +pub struct KeySerializer { + buf: Vec, +} + +pub trait KeySerialize { + fn serialize(&self, buf: &mut Vec); +} + +pub trait DeserializeBigEndian { + fn deserialize_be_u32(&self, index: usize) -> Option; + fn deserialize_be_u64(&self, index: usize) -> Option; +} + +impl KeySerializer { + pub fn new(capacity: usize) -> Self { + Self { + buf: Vec::with_capacity(capacity), + } + } + + pub fn write(mut self, value: T) -> Self { + value.serialize(&mut self.buf); + self + } + + pub fn write_leb128(mut self, value: T) -> Self { + T::to_leb128_bytes(value, &mut self.buf); + self + } + + pub fn finalize(self) -> Vec { + self.buf + } +} + +impl KeySerialize for u8 { + fn serialize(&self, buf: &mut Vec) { + buf.push(*self); + } +} + +impl KeySerialize for &str { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(self.as_bytes()); + } +} + +impl KeySerialize for &String { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(self.as_bytes()); + } +} + +impl KeySerialize for &[u8] { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(self); + } +} + +impl KeySerialize for u32 { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(&self.to_be_bytes()); + } +} + +impl KeySerialize for u64 { + fn serialize(&self, buf: &mut Vec) { + buf.extend_from_slice(&self.to_be_bytes()); + } +} + +impl DeserializeBigEndian for &[u8] { + fn deserialize_be_u32(&self, index: usize) -> Option { + u32::from_be_bytes( + self.get(index..index + std::mem::size_of::())? + .try_into() + .ok()?, + ) + .into() + } + + fn deserialize_be_u64(&self, index: usize) -> Option { + u64::from_be_bytes( + self.get(index..index + std::mem::size_of::())? + .try_into() + .ok()?, + ) + .into() + } +} diff --git a/src/write/log.rs b/src/write/log.rs new file mode 100644 index 00000000..85851096 --- /dev/null +++ b/src/write/log.rs @@ -0,0 +1,109 @@ +use ahash::AHashSet; +use utils::{codec::leb128::Leb128Vec, map::vec_map::VecMap}; + +use crate::Serialize; + +use super::{IntoOperations, Operation}; + +pub struct ChangeLogBuilder { + pub change_id: u64, + pub changes: VecMap, +} + +#[derive(Default)] +pub struct Change { + pub inserts: AHashSet, + pub updates: AHashSet, + pub deletes: AHashSet, + pub child_updates: AHashSet, +} + +impl ChangeLogBuilder { + pub fn with_change_id(change_id: u64) -> ChangeLogBuilder { + ChangeLogBuilder { + change_id, + changes: VecMap::default(), + } + } + + pub fn log_insert(&mut self, collection: impl Into, jmap_id: impl Into) { + self.changes + .get_mut_or_insert(collection.into()) + .inserts + .insert(jmap_id.into()); + } + + pub fn log_update(&mut self, collection: impl Into, jmap_id: impl Into) { + self.changes + .get_mut_or_insert(collection.into()) + .updates + .insert(jmap_id.into()); + } + + pub fn log_child_update(&mut self, collection: impl Into, jmap_id: impl Into) { + self.changes + .get_mut_or_insert(collection.into()) + .child_updates + .insert(jmap_id.into()); + } + + pub fn log_delete(&mut self, collection: impl Into, jmap_id: impl Into) { + self.changes + .get_mut_or_insert(collection.into()) + .deletes + .insert(jmap_id.into()); + } + + pub fn log_move( + &mut self, + collection: impl Into, + old_jmap_id: impl Into, + new_jmap_id: impl Into, + ) { + let change = self.changes.get_mut_or_insert(collection.into()); + change.deletes.insert(old_jmap_id.into()); + change.inserts.insert(new_jmap_id.into()); + } +} + +impl IntoOperations for ChangeLogBuilder { + fn build(self, batch: &mut super::BatchBuilder) -> crate::Result<()> { + for (collection, changes) in self.changes { + if collection != batch.last_collection { + batch.last_collection = collection; + batch.push_context(); + } + + batch.ops.push(Operation::Log { + change_id: self.change_id, + changes: changes.serialize(), + }); + } + + Ok(()) + } +} + +impl Serialize for Change { + fn serialize(self) -> Vec { + let mut buf = Vec::with_capacity( + 1 + (self.inserts.len() + + self.updates.len() + + self.child_updates.len() + + self.deletes.len() + + 4) + * std::mem::size_of::(), + ); + + buf.push_leb128(self.inserts.len()); + buf.push_leb128(self.updates.len()); + buf.push_leb128(self.child_updates.len()); + buf.push_leb128(self.deletes.len()); + for list in [self.inserts, self.updates, self.child_updates, self.deletes] { + for id in list { + buf.push_leb128(id); + } + } + buf + } +} diff --git a/src/write/mod.rs b/src/write/mod.rs new file mode 100644 index 00000000..b68e8d37 --- /dev/null +++ b/src/write/mod.rs @@ -0,0 +1,157 @@ +use std::collections::HashSet; + +use crate::Serialize; + +pub mod batch; +pub mod key; +pub mod log; + +pub const F_VALUE: u32 = 1 << 0; +pub const F_INDEX: u32 = 1 << 1; +pub const F_TOKENIZE: u32 = 1 << 2; +pub const F_CLEAR: u32 = 1 << 3; + +pub struct Batch { + pub ops: Vec, +} + +pub struct BatchBuilder { + pub last_account_id: u32, + pub last_document_id: u32, + pub last_collection: u8, + pub ops: Vec, +} + +pub enum Operation { + WithContext { + account_id: u32, + document_id: u32, + collection: u8, + }, + Value { + field: u8, + set: Option>, + }, + Index { + field: u8, + key: Vec, + set: bool, + }, + Bitmap { + family: u8, + field: u8, + key: Vec, + set: bool, + }, + Blob { + key: Vec, + set: bool, + }, + Acl { + to_account_id: u32, + set: Option>, + }, + Log { + change_id: u64, + changes: Vec, + }, +} + +impl Serialize for u32 { + fn serialize(self) -> Vec { + self.to_be_bytes().to_vec() + } +} + +impl Serialize for u64 { + fn serialize(self) -> Vec { + self.to_be_bytes().to_vec() + } +} + +impl Serialize for f64 { + fn serialize(self) -> Vec { + self.to_be_bytes().to_vec() + } +} + +impl Serialize for &str { + fn serialize(self) -> Vec { + self.as_bytes().to_vec() + } +} + +impl Serialize for String { + fn serialize(self) -> Vec { + self.into_bytes() + } +} + +trait HasFlag { + fn has_flag(&self, flag: u32) -> bool; +} + +impl HasFlag for u32 { + #[inline(always)] + fn has_flag(&self, flag: u32) -> bool { + self & flag == flag + } +} + +pub trait Tokenize { + fn tokenize(&self) -> HashSet>; +} + +impl Tokenize for &str { + fn tokenize(&self) -> HashSet> { + let mut tokens = HashSet::new(); + let mut token = String::new(); + + for ch in self.chars() { + if ch.is_alphanumeric() { + if ch.is_uppercase() { + token.push(ch.to_lowercase().next().unwrap()); + } else { + token.push(ch); + } + } else if !token.is_empty() { + tokens.insert(token.into_bytes()); + token = String::new(); + } + } + + tokens + } +} + +impl Tokenize for String { + fn tokenize(&self) -> HashSet> { + self.as_str().tokenize() + } +} + +impl Tokenize for u32 { + fn tokenize(&self) -> HashSet> { + unreachable!() + } +} + +impl Tokenize for u64 { + fn tokenize(&self) -> HashSet> { + unreachable!() + } +} + +impl Tokenize for f64 { + fn tokenize(&self) -> HashSet> { + unreachable!() + } +} + +pub trait IntoBitmap { + fn into_bitmap(self) -> (Vec, u8); +} + +pub trait IntoOperations { + fn build(self, batch: &mut BatchBuilder) -> crate::Result<()>; +}