This commit is contained in:
Mauro D 2023-03-22 17:41:58 +00:00
parent ca889365c5
commit 8cbe5bebc3
30 changed files with 5407 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
/Cargo.lock

19
Cargo.toml Normal file
View file

@ -0,0 +1,19 @@
[package]
name = "store"
version = "0.1.0"
edition = "2021"
[dependencies]
utils = { path = "../utils" }
rand = "0.8.5"
roaring = "0.10.1"
rocksdb = "0.20.1"
serde = { version = "1.0", features = ["derive"]}
ahash = { version = "0.8.0", features = ["serde"] }
bitpacking = "0.8.4"
lazy_static = "1.4"
whatlang = "0.16" # Language detection
rust-stemmers = "1.2" # Stemmers
tinysegmenter = "0.1" # Japanese tokenizer
jieba-rs = "0.6" # Chinese stemmer

92
pepe.toml Normal file
View file

@ -0,0 +1,92 @@
I have the following SQLite table for storing email data:
CREATE TABLE email (
email_id INTEGER PRIMARY KEY,
blob_id TEXT NOT NULL,
thread_id INTEGER NOT NULL,
size INTEGER NOT NULL,
received_at TIMESTAMP NOT NULL,
message_id TEXT NOT NULL,
in_reply_to TEXT NOT NULL,
sender TEXT NOT NULL,
from TEXT NOT NULL,
to TEXT NOT NULL,
cc TEXT NOT NULL,
bcc TEXT NOT NULL,
reply_to TEXT NOT NULL,
subject TEXT NOT NULL,
sent_at TIMESTAMP NOT NULL,
has_attachment BOOL NOT NULL,
preview TEXT NOT NULL
);
The mailboxes and keywords for each message are stored in separate tables:
CREATE TABLE email_mailbox (
email_id INTEGER PRIMARY KEY,
mailbox_id INTEGER NOT NULL,
)
CREATE TABLE email_keyword (
email_id INTEGER PRIMARY KEY,
keyword TEXT NOT NULL
);
How would you write a SQLite query to list the email IDs of all the Emails with the subject "sales" sorted by messages that belong to the same Thread and have a the keyword "draft", then sorted by received at and then has attachment.
[email]
id: INT
blob_id: HASH
thread_id: INT
size: INT
received_at: TIMESTAMP
message_id: TEXT
in_reply_to: TEXT
sender: TEXT
from: TEXT
to: TEXT
cc: TEXT
bcc: TEXT
reply_to: TEXT
subject: TEXT
sent_at: TIMESTAMP
has_attachment: BOOL
preview: TEXT
[email_mailbox]
email_id: INT
mailbox_id: INT
imap_uid: INT
[email_keyword]
email_id: INT
keyword: TEXT
/*
o id
o blobId
o threadId
o mailboxIds
o keywords
o size
o receivedAt
o messageId
o inReplyTo
o sender
o from
o to
o cc
o bcc
o replyTo
o subject
o sentAt
o hasAttachment
o preview
[ "partId", "blobId", "size", "name", "type", "charset",
"disposition", "cid", "language", "location" ]
*/

View file

2
src/backend/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod foundationdb;
pub mod rocksdb;

View file

@ -0,0 +1,326 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use crate::{Deserialize, Serialize};
use roaring::RoaringBitmap;
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
pub const BIT_SET: u8 = 0x80;
pub const BIT_CLEAR: u8 = 0;
pub const IS_BITLIST: u8 = 0;
pub const IS_BITMAP: u8 = 1;
#[inline(always)]
pub fn deserialize_bitlist(bm: &mut RoaringBitmap, bytes: &[u8]) {
let mut it = bytes[1..].iter();
'inner: while let Some(header) = it.next() {
let mut items = (header & 0x7F) + 1;
let is_set = (header & BIT_SET) != 0;
while items > 0 {
if let Some(doc_id) = it.next_leb128() {
if is_set {
bm.insert(doc_id);
} else {
bm.remove(doc_id);
}
items -= 1;
} else {
debug_assert!(items == 0, "{:?}", bytes);
break 'inner;
}
}
}
}
#[inline(always)]
pub fn deserialize_bitmap(bytes: &[u8]) -> Option<RoaringBitmap> {
RoaringBitmap::deserialize_unchecked_from(&bytes[1..]).ok()
}
impl Deserialize for RoaringBitmap {
fn deserialize(bytes: &[u8]) -> Option<Self> {
match *bytes.first()? {
IS_BITMAP => deserialize_bitmap(bytes),
IS_BITLIST => {
let mut bm = RoaringBitmap::new();
deserialize_bitlist(&mut bm, bytes);
Some(bm)
}
_ => None,
}
}
}
impl Serialize for RoaringBitmap {
fn serialize(self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.serialized_size() + 1);
bytes.push(IS_BITMAP);
let _ = self.serialize_into(&mut bytes);
bytes
}
}
macro_rules! impl_bit {
($single:ident, $many:ident, $flag:ident) => {
#[inline(always)]
pub fn $single(document: u32) -> Vec<u8> {
let mut buf = Vec::with_capacity(std::mem::size_of::<u32>() + 2);
buf.push(IS_BITLIST);
buf.push($flag);
buf.push_leb128(document);
buf
}
#[inline(always)]
pub fn $many<T>(documents: T) -> Vec<u8>
where
T: Iterator<Item = u32>,
{
debug_assert!(documents.size_hint().0 > 0);
let mut buf = Vec::with_capacity(
((std::mem::size_of::<u32>() + 1)
* documents
.size_hint()
.1
.unwrap_or_else(|| documents.size_hint().0))
+ 2,
);
buf.push(IS_BITLIST);
let mut header_pos = 0;
let mut total_docs = 0;
for (pos, document) in documents.enumerate() {
if pos & 0x7F == 0 {
header_pos = buf.len();
buf.push($flag | 0x7F);
}
buf.push_leb128(document);
total_docs = pos;
}
buf[header_pos] = $flag | ((total_docs & 0x7F) as u8);
buf
}
};
}
impl_bit!(set_bit, set_bits, BIT_SET);
impl_bit!(clear_bit, clear_bits, BIT_CLEAR);
#[inline(always)]
pub fn set_clear_bits<T>(documents: T) -> Vec<u8>
where
T: Iterator<Item = (u32, bool)>,
{
debug_assert!(documents.size_hint().0 > 0);
let total_docs = documents
.size_hint()
.1
.unwrap_or_else(|| documents.size_hint().0);
let buf_len = (std::mem::size_of::<u32>() * total_docs) + (total_docs / 0x7F) + 2;
let mut set_buf = Vec::with_capacity(buf_len);
let mut clear_buf = Vec::with_capacity(buf_len);
let mut set_header_pos = 0;
let mut set_total_docs = 0;
let mut clear_header_pos = 0;
let mut clear_total_docs = 0;
set_buf.push(IS_BITLIST);
clear_buf.push(IS_BITLIST);
for (document, is_set) in documents {
if is_set {
if set_total_docs & 0x7F == 0 {
set_header_pos = set_buf.len();
set_buf.push(BIT_SET | 0x7F);
}
set_buf.push_leb128(document);
set_total_docs += 1;
} else {
if clear_total_docs & 0x7F == 0 {
clear_header_pos = clear_buf.len();
clear_buf.push(BIT_CLEAR | 0x7F);
}
clear_buf.push_leb128(document);
clear_total_docs += 1;
}
}
if set_total_docs > 0 {
set_buf[set_header_pos] = BIT_SET | (((set_total_docs - 1) & 0x7F) as u8);
}
if clear_total_docs > 0 {
clear_buf[clear_header_pos] = BIT_CLEAR | (((clear_total_docs - 1) & 0x7F) as u8);
}
if set_total_docs > 0 && clear_total_docs > 0 {
set_buf.extend_from_slice(&clear_buf[1..]);
set_buf
} else if set_total_docs > 0 {
set_buf
} else {
clear_buf
}
}
#[inline(always)]
pub fn bitmap_merge<'x>(
existing_val: Option<&[u8]>,
operands_len: usize,
operands: impl IntoIterator<Item = &'x [u8]>,
) -> Option<Vec<u8>> {
let mut bm = match existing_val {
Some(existing_val) => RoaringBitmap::deserialize(existing_val)?,
None if operands_len == 1 => {
return Some(Vec::from(operands.into_iter().next().unwrap()));
}
_ => RoaringBitmap::new(),
};
for op in operands.into_iter() {
match *op.first()? {
IS_BITMAP => {
if let Some(union_bm) = deserialize_bitmap(op) {
if !bm.is_empty() {
bm |= union_bm;
} else {
bm = union_bm;
}
} else {
debug_assert!(false, "Failed to deserialize bitmap.");
return None;
}
}
IS_BITLIST => {
deserialize_bitlist(&mut bm, op);
}
_ => {
debug_assert!(false, "This should not have happend");
return None;
}
}
}
let mut bytes = Vec::with_capacity(bm.serialized_size() + 1);
bytes.push(IS_BITMAP);
bm.serialize_into(&mut bytes).ok()?;
Some(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn merge_bitmaps() {
let v1 = set_clear_bits([(1, true), (2, true), (3, false), (4, true)].into_iter());
let v2 = set_clear_bits([(1, false), (4, false)].into_iter());
let v3 = set_clear_bits([(5, true)].into_iter());
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(&v1).unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(&bitmap_merge(None, 1, [v1.as_ref()]).unwrap()).unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2]),
RoaringBitmap::deserialize(&bitmap_merge(None, 2, [v1.as_ref(), v2.as_ref()]).unwrap())
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2, 5]),
RoaringBitmap::deserialize(
&bitmap_merge(None, 3, [v1.as_ref(), v2.as_ref(), v3.as_ref()]).unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2, 5]),
RoaringBitmap::deserialize(
&bitmap_merge(Some(v1.as_ref()), 2, [v2.as_ref(), v3.as_ref()]).unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([5]),
RoaringBitmap::deserialize(&bitmap_merge(Some(v2.as_ref()), 1, [v3.as_ref()]).unwrap())
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
1,
[v1.as_ref()]
)
.unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 3, 4, 5, 6]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
1,
[RoaringBitmap::from_iter([5, 6]).serialize().as_ref()]
)
.unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4, 5, 6]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
2,
[
RoaringBitmap::from_iter([5, 6]).serialize().as_ref(),
v1.as_ref()
]
)
.unwrap()
)
.unwrap()
);
}
}

135
src/backend/rocksdb/main.rs Normal file
View file

@ -0,0 +1,135 @@
use std::path::PathBuf;
use roaring::RoaringBitmap;
use rocksdb::{ColumnFamilyDescriptor, MergeOperands, OptimisticTransactionDB, Options};
use crate::{Deserialize, Error, Store};
use super::{CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES};
impl Store {
pub fn open() -> crate::Result<Self> {
// Create the database directory if it doesn't exist
let path = PathBuf::from(
"/tmp/rocksdb.test", /*&settings
.get("db-path")
.unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/
);
let mut idx_path = path;
idx_path.push("idx");
std::fs::create_dir_all(&idx_path).map_err(|err| {
Error::InternalError(format!(
"Failed to create index directory {}: {:?}",
idx_path.display(),
err
))
})?;
// Bitmaps
let cf_bitmaps = {
let mut cf_opts = Options::default();
//cf_opts.set_max_write_buffer_number(16);
cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge);
cf_opts.set_compaction_filter("compact", bitmap_compact);
ColumnFamilyDescriptor::new(CF_BITMAPS, cf_opts)
};
// Stored values
let cf_values = {
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
ColumnFamilyDescriptor::new(CF_VALUES, cf_opts)
};
// Secondary indexes
let cf_indexes = {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new(CF_INDEXES, cf_opts)
};
// Blobs
let cf_blobs = {
let mut cf_opts = Options::default();
cf_opts.set_enable_blob_files(true);
cf_opts.set_min_blob_size(
16834, /*settings.parse("blob-min-size").unwrap_or(16384) */
);
ColumnFamilyDescriptor::new(CF_BLOBS, cf_opts)
};
// Raft log and change log
let cf_log = {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new(CF_LOGS, cf_opts)
};
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
Ok(Store {
db: OptimisticTransactionDB::open_cf_descriptors(
&db_opts,
idx_path,
vec![cf_bitmaps, cf_values, cf_indexes, cf_blobs, cf_log],
)
.map_err(|e| Error::InternalError(e.into_string()))?,
})
}
pub fn close(&self) -> crate::Result<()> {
self.db
.flush()
.map_err(|e| Error::InternalError(e.to_string()))?;
self.db.cancel_all_background_work(true);
Ok(())
}
}
pub fn numeric_value_merge(
_key: &[u8],
value: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
let mut value = if let Some(value) = value {
i64::from_le_bytes(value.try_into().ok()?)
} else {
0
};
for op in operands.iter() {
value += i64::from_le_bytes(op.try_into().ok()?);
}
let mut bytes = Vec::with_capacity(std::mem::size_of::<i64>());
bytes.extend_from_slice(&value.to_le_bytes());
Some(bytes)
}
pub fn bitmap_merge(
_new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
super::bitmap::bitmap_merge(existing_val, operands.len(), operands.into_iter())
}
pub fn bitmap_partial_merge(
_new_key: &[u8],
_existing_val: Option<&[u8]>,
_operands: &MergeOperands,
) -> Option<Vec<u8>> {
// Force a full merge
None
}
pub fn bitmap_compact(
_level: u32,
_key: &[u8],
value: &[u8],
) -> rocksdb::compaction_filter::Decision {
match RoaringBitmap::deserialize(value) {
Some(bm) if bm.is_empty() => rocksdb::compaction_filter::Decision::Remove,
_ => rocksdb::compaction_filter::Decision::Keep,
}
}

View file

@ -0,0 +1,50 @@
use crate::{write::key::KeySerializer, BitmapKey, IndexKey, Serialize, ValueKey};
pub mod bitmap;
pub mod main;
pub mod read;
pub const CF_BITMAPS: &str = "b";
pub const CF_VALUES: &str = "v";
pub const CF_LOGS: &str = "l";
pub const CF_BLOBS: &str = "o";
pub const CF_INDEXES: &str = "i";
pub const COLLECTION_PREFIX_LEN: usize = std::mem::size_of::<u32>() + std::mem::size_of::<u8>();
pub const FIELD_PREFIX_LEN: usize = COLLECTION_PREFIX_LEN + std::mem::size_of::<u8>();
pub const ACCOUNT_KEY_LEN: usize =
std::mem::size_of::<u32>() + std::mem::size_of::<u8>() + std::mem::size_of::<u32>();
impl Serialize for IndexKey<'_> {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<IndexKey>() + self.key.len())
.write(self.account_id)
.write(self.collection)
.write(self.field)
.write(self.key)
.finalize()
}
}
impl Serialize for ValueKey {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<ValueKey>())
.write_leb128(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.write(self.field)
.finalize()
}
}
impl Serialize for BitmapKey<'_> {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<BitmapKey>() + self.key.len())
.write(self.key)
.write(self.field)
.write(self.collection)
.write(self.family)
.write_leb128(self.account_id)
.finalize()
}
}

217
src/backend/rocksdb/read.rs Normal file
View file

@ -0,0 +1,217 @@
use std::ops::{BitAndAssign, BitOrAssign};
use roaring::RoaringBitmap;
use rocksdb::{Direction, IteratorMode};
use crate::{
query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, IndexKey,
Serialize, Store, ValueKey, BM_DOCUMENT_IDS,
};
use super::{CF_BITMAPS, CF_INDEXES, CF_VALUES, FIELD_PREFIX_LEN};
impl Store {
#[inline(always)]
pub fn get_value<U>(&self, key: ValueKey) -> crate::Result<Option<U>>
where
U: Deserialize,
{
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key.serialize())
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
Ok(Some(U::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!("Failed to deserialize key: {:?}", key))
})?))
} else {
Ok(None)
}
}
#[inline(always)]
pub fn get_values<U>(&self, keys: Vec<ValueKey>) -> crate::Result<Vec<Option<U>>>
where
U: Deserialize,
{
let cf_handle = self.db.cf_handle(CF_VALUES).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
results.push(
if let Some(bytes) = value
.map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))?
{
U::deserialize(&bytes)
.ok_or_else(|| {
Error::InternalError("Failed to deserialize keys.".to_string())
})?
.into()
} else {
None
},
);
}
Ok(results)
}
pub fn get_document_ids(
&self,
account_id: u32,
collection: u8,
) -> crate::Result<Option<RoaringBitmap>> {
self.get_bitmap(BitmapKey {
account_id,
collection,
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
})
}
#[inline(always)]
pub fn get_bitmap(&self, key: BitmapKey) -> crate::Result<Option<RoaringBitmap>> {
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), key.serialize())
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
let bm = RoaringBitmap::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!("Failed to deserialize key: {:?}", key))
})?;
Ok(if !bm.is_empty() { Some(bm) } else { None })
} else {
Ok(None)
}
}
#[inline(always)]
fn get_bitmaps(&self, keys: Vec<BitmapKey>) -> crate::Result<Vec<Option<RoaringBitmap>>> {
let cf_handle = self.db.cf_handle(CF_BITMAPS).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
results.push(
if let Some(bytes) = value
.map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))?
{
RoaringBitmap::deserialize(&bytes)
.ok_or_else(|| {
Error::InternalError("Failed to deserialize keys.".to_string())
})?
.into()
} else {
None
},
);
}
Ok(results)
}
pub(crate) fn get_bitmaps_intersection(
&self,
keys: Vec<BitmapKey<'_>>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in self.get_bitmaps(keys)? {
if let Some(bitmap) = bitmap {
if let Some(result) = &mut result {
result.bitand_assign(&bitmap);
if result.is_empty() {
break;
}
} else {
result = Some(bitmap);
}
} else {
return Ok(None);
}
}
Ok(result)
}
pub(crate) fn get_bitmaps_union(
&self,
keys: Vec<BitmapKey<'_>>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in (self.get_bitmaps(keys)?).into_iter().flatten() {
if let Some(result) = &mut result {
result.bitor_assign(&bitmap);
} else {
result = Some(bitmap);
}
}
Ok(result)
}
pub(crate) fn range_to_bitmap(
&self,
key: IndexKey<'_>,
op: Operator,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
let match_key = key.serialize();
let match_prefix = &match_key[0..FIELD_PREFIX_LEN];
let match_value = &match_key[FIELD_PREFIX_LEN..];
for result in self.db.iterator_cf(
&self.db.cf_handle(CF_INDEXES).unwrap(),
IteratorMode::From(
&match_key,
match op {
Operator::GreaterThan => Direction::Forward,
Operator::GreaterEqualThan => Direction::Forward,
Operator::Equal => Direction::Forward,
_ => Direction::Reverse,
},
),
) {
let (key, _) = result
.map_err(|err| Error::InternalError(format!("iterator_cf failed: {}", err)))?;
if !key.starts_with(match_prefix) {
break;
}
let doc_id_pos = key.len() - std::mem::size_of::<u32>();
let value = key.get(FIELD_PREFIX_LEN..doc_id_pos).ok_or_else(|| {
Error::InternalError("Invalid key found in 'indexes' column family.".to_string())
})?;
match op {
Operator::LowerThan if value >= match_value => {
if value == match_value {
continue;
} else {
break;
}
}
Operator::LowerEqualThan if value > match_value => break,
Operator::GreaterThan if value <= match_value => {
if value == match_value {
continue;
} else {
break;
}
}
Operator::GreaterEqualThan if value < match_value => break,
Operator::Equal if value != match_value => break,
_ => {
bm.insert(key.as_ref().deserialize_be_u32(doc_id_pos).ok_or_else(|| {
Error::InternalError(
"Invalid key found in 'indexes' column family.".to_string(),
)
})?);
}
}
}
Ok(Some(bm))
}
}

157
src/fts/index.rs Normal file
View file

@ -0,0 +1,157 @@
use std::collections::HashSet;
use ahash::AHashSet;
use crate::{
write::{BatchBuilder, IntoOperations, Operation, Tokenize},
Error, Serialize, BM_TERM, TERM_EXACT, TERM_STEMMED,
};
use super::{
lang::{LanguageDetector, MIN_LANGUAGE_SCORE},
stemmer::Stemmer,
term_index::{TermIndexBuilder, TokenIndex},
Language,
};
pub const MAX_TOKEN_LENGTH: usize = 25;
struct Text<'x> {
field: u8,
text: &'x str,
language: Language,
part_id: u32,
}
pub struct IndexBuilder<'x> {
parts: Vec<Text<'x>>,
detect: LanguageDetector,
default_language: Language,
}
impl<'x> IndexBuilder<'x> {
pub fn with_default_language(default_language: Language) -> IndexBuilder<'x> {
IndexBuilder {
parts: vec![],
detect: LanguageDetector::new(),
default_language,
}
}
pub fn index(
&mut self,
field: impl Into<u8>,
text: &'x str,
mut language: Language,
part_id: u32,
) {
if language == Language::Unknown {
language = self.detect.detect(text, MIN_LANGUAGE_SCORE);
}
self.parts.push(Text {
field: field.into(),
text,
language,
part_id,
});
}
}
impl<'x> IntoOperations for IndexBuilder<'x> {
fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> {
let default_language = self
.detect
.most_frequent_language()
.unwrap_or(self.default_language);
let mut term_index = TermIndexBuilder::new();
let mut words = HashSet::new();
for part in &self.parts {
let language = if part.language != Language::Unknown {
part.language
} else {
default_language
};
let mut terms = Vec::new();
for token in Stemmer::new(part.text, language, MAX_TOKEN_LENGTH) {
words.insert((token.word.as_bytes().to_vec(), part.field, true));
if let Some(stemmed_word) = token.stemmed_word.as_ref() {
words.insert((stemmed_word.as_bytes().to_vec(), part.field, false));
}
terms.push(term_index.add_stemmed_token(token));
}
if !terms.is_empty() {
term_index.add_terms(part.field, part.part_id, terms);
}
}
for (key, field, is_exact) in words {
batch.ops.push(Operation::Bitmap {
family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED },
field,
key,
set: true,
});
}
if !term_index.is_empty() {
batch.ops.push(Operation::Value {
field: u8::MAX,
set: term_index.serialize().into(),
});
}
Ok(())
}
}
impl IntoOperations for TokenIndex {
fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> {
let mut tokens = AHashSet::new();
for term in self.terms {
for (term_ids, is_exact) in [(term.exact_terms, true), (term.stemmed_terms, false)] {
for term_id in term_ids {
tokens.insert((
term.field_id,
is_exact,
self.tokens
.get(term_id as usize)
.ok_or_else(|| {
Error::InternalError("Corrupted term index.".to_string())
})?
.as_bytes()
.to_vec(),
));
}
}
}
for (field, is_exact, key) in tokens {
batch.ops.push(Operation::Bitmap {
family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED },
field,
key,
set: false,
});
}
batch.ops.push(Operation::Value {
field: u8::MAX,
set: None,
});
Ok(())
}
}
impl Tokenize for TermIndexBuilder {
fn tokenize(&self) -> HashSet<Vec<u8>> {
unreachable!()
}
}

252
src/fts/lang.rs Normal file
View file

@ -0,0 +1,252 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashMap;
use whatlang::{detect, Lang};
use super::Language;
pub const MIN_LANGUAGE_SCORE: f64 = 0.5;
#[derive(Debug)]
struct WeightedAverage {
weight: usize,
occurrences: usize,
confidence: f64,
}
#[derive(Debug)]
pub struct LanguageDetector {
lang_detected: AHashMap<Language, WeightedAverage>,
}
impl Default for LanguageDetector {
fn default() -> Self {
Self::new()
}
}
impl LanguageDetector {
pub fn new() -> LanguageDetector {
LanguageDetector {
lang_detected: AHashMap::default(),
}
}
pub fn detect(&mut self, text: &str, min_score: f64) -> Language {
if let Some((language, confidence)) = LanguageDetector::detect_single(text) {
let w = self
.lang_detected
.entry(language)
.or_insert_with(|| WeightedAverage {
weight: 0,
confidence: 0.0,
occurrences: 0,
});
w.occurrences += 1;
w.weight += text.len();
w.confidence += confidence * text.len() as f64;
if confidence < min_score {
Language::Unknown
} else {
language
}
} else {
Language::Unknown
}
}
pub fn most_frequent_language(&self) -> Option<Language> {
self.lang_detected
.iter()
.max_by(|(_, a), (_, b)| {
((a.confidence / a.weight as f64) * a.occurrences as f64)
.partial_cmp(&((b.confidence / b.weight as f64) * b.occurrences as f64))
.unwrap_or(std::cmp::Ordering::Less)
})
.map(|(l, _)| *l)
}
pub fn detect_single(text: &str) -> Option<(Language, f64)> {
detect(text).map(|info| {
(
match info.lang() {
Lang::Epo => Language::Esperanto,
Lang::Eng => Language::English,
Lang::Rus => Language::Russian,
Lang::Cmn => Language::Mandarin,
Lang::Spa => Language::Spanish,
Lang::Por => Language::Portuguese,
Lang::Ita => Language::Italian,
Lang::Ben => Language::Bengali,
Lang::Fra => Language::French,
Lang::Deu => Language::German,
Lang::Ukr => Language::Ukrainian,
Lang::Kat => Language::Georgian,
Lang::Ara => Language::Arabic,
Lang::Hin => Language::Hindi,
Lang::Jpn => Language::Japanese,
Lang::Heb => Language::Hebrew,
Lang::Yid => Language::Yiddish,
Lang::Pol => Language::Polish,
Lang::Amh => Language::Amharic,
Lang::Jav => Language::Javanese,
Lang::Kor => Language::Korean,
Lang::Nob => Language::Bokmal,
Lang::Dan => Language::Danish,
Lang::Swe => Language::Swedish,
Lang::Fin => Language::Finnish,
Lang::Tur => Language::Turkish,
Lang::Nld => Language::Dutch,
Lang::Hun => Language::Hungarian,
Lang::Ces => Language::Czech,
Lang::Ell => Language::Greek,
Lang::Bul => Language::Bulgarian,
Lang::Bel => Language::Belarusian,
Lang::Mar => Language::Marathi,
Lang::Kan => Language::Kannada,
Lang::Ron => Language::Romanian,
Lang::Slv => Language::Slovene,
Lang::Hrv => Language::Croatian,
Lang::Srp => Language::Serbian,
Lang::Mkd => Language::Macedonian,
Lang::Lit => Language::Lithuanian,
Lang::Lav => Language::Latvian,
Lang::Est => Language::Estonian,
Lang::Tam => Language::Tamil,
Lang::Vie => Language::Vietnamese,
Lang::Urd => Language::Urdu,
Lang::Tha => Language::Thai,
Lang::Guj => Language::Gujarati,
Lang::Uzb => Language::Uzbek,
Lang::Pan => Language::Punjabi,
Lang::Aze => Language::Azerbaijani,
Lang::Ind => Language::Indonesian,
Lang::Tel => Language::Telugu,
Lang::Pes => Language::Persian,
Lang::Mal => Language::Malayalam,
Lang::Ori => Language::Oriya,
Lang::Mya => Language::Burmese,
Lang::Nep => Language::Nepali,
Lang::Sin => Language::Sinhalese,
Lang::Khm => Language::Khmer,
Lang::Tuk => Language::Turkmen,
Lang::Aka => Language::Akan,
Lang::Zul => Language::Zulu,
Lang::Sna => Language::Shona,
Lang::Afr => Language::Afrikaans,
Lang::Lat => Language::Latin,
Lang::Slk => Language::Slovak,
Lang::Cat => Language::Catalan,
Lang::Tgl => Language::Tagalog,
Lang::Hye => Language::Armenian,
},
info.confidence(),
)
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn detect_languages() {
let inputs = [
(
"The quick brown fox jumps over the lazy dog",
Language::English,
),
(
"Jovencillo emponzoñado de whisky: ¡qué figurota exhibe!",
Language::Spanish,
),
(
"Ma la volpe col suo balzo ha raggiunto il quieto Fido",
Language::Italian,
),
(
"Jaz em prisão bota que vexa dez cegonhas felizes",
Language::Portuguese,
),
(
"Zwölf Boxkämpfer jagten Victor quer über den großen Sylter Deich",
Language::German,
),
("עטלף אבק נס דרך מזגן שהתפוצץ כי חם", Language::Hebrew),
(
"Съешь ещё этих мягких французских булок, да выпей же чаю",
Language::Russian,
),
(
"Чуєш їх, доцю, га? Кумедна ж ти, прощайся без ґольфів!",
Language::Ukrainian,
),
(
"Љубазни фењерџија чађавог лица хоће да ми покаже штос",
Language::Serbian,
),
(
"Pijamalı hasta yağız şoföre çabucak güvendi",
Language::Turkish,
),
("己所不欲,勿施于人。", Language::Mandarin),
("井の中の蛙大海を知らず", Language::Japanese),
("시작이 반이다", Language::Korean),
];
let mut detector = LanguageDetector::new();
for input in inputs.iter() {
assert_eq!(detector.detect(input.0, 0.0), input.1);
}
}
#[test]
fn weighted_language() {
let mut detector = LanguageDetector::new();
for lang in [
(Language::Spanish, 0.5, 70),
(Language::Japanese, 0.2, 100),
(Language::Japanese, 0.3, 100),
(Language::Japanese, 0.4, 200),
(Language::English, 0.7, 50),
]
.iter()
{
let w = detector
.lang_detected
.entry(lang.0)
.or_insert_with(|| WeightedAverage {
weight: 0,
confidence: 0.0,
occurrences: 0,
});
w.occurrences += 1;
w.weight += lang.2;
w.confidence += lang.1 * lang.2 as f64;
}
assert_eq!(detector.most_frequent_language(), Some(Language::Japanese));
}
}

161
src/fts/mod.rs Normal file
View file

@ -0,0 +1,161 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
pub mod lang;
//pub mod pdf;
pub mod index;
pub mod search_snippet;
pub mod stemmer;
pub mod term_index;
pub mod tokenizers;
#[derive(Debug, PartialEq, Clone, Copy, Hash, Eq, serde::Serialize, serde::Deserialize)]
pub enum Language {
Esperanto = 0,
English = 1,
Russian = 2,
Mandarin = 3,
Spanish = 4,
Portuguese = 5,
Italian = 6,
Bengali = 7,
French = 8,
German = 9,
Ukrainian = 10,
Georgian = 11,
Arabic = 12,
Hindi = 13,
Japanese = 14,
Hebrew = 15,
Yiddish = 16,
Polish = 17,
Amharic = 18,
Javanese = 19,
Korean = 20,
Bokmal = 21,
Danish = 22,
Swedish = 23,
Finnish = 24,
Turkish = 25,
Dutch = 26,
Hungarian = 27,
Czech = 28,
Greek = 29,
Bulgarian = 30,
Belarusian = 31,
Marathi = 32,
Kannada = 33,
Romanian = 34,
Slovene = 35,
Croatian = 36,
Serbian = 37,
Macedonian = 38,
Lithuanian = 39,
Latvian = 40,
Estonian = 41,
Tamil = 42,
Vietnamese = 43,
Urdu = 44,
Thai = 45,
Gujarati = 46,
Uzbek = 47,
Punjabi = 48,
Azerbaijani = 49,
Indonesian = 50,
Telugu = 51,
Persian = 52,
Malayalam = 53,
Oriya = 54,
Burmese = 55,
Nepali = 56,
Sinhalese = 57,
Khmer = 58,
Turkmen = 59,
Akan = 60,
Zulu = 61,
Shona = 62,
Afrikaans = 63,
Latin = 64,
Slovak = 65,
Catalan = 66,
Tagalog = 67,
Armenian = 68,
Unknown = 69,
None = 70,
}
impl Language {
pub fn from_iso_639(code: &str) -> Option<Self> {
match code.split_once('-').map(|c| c.0).unwrap_or(code) {
"en" => Language::English,
"es" => Language::Spanish,
"pt" => Language::Portuguese,
"it" => Language::Italian,
"fr" => Language::French,
"de" => Language::German,
"ru" => Language::Russian,
"zh" => Language::Mandarin,
"ja" => Language::Japanese,
"ar" => Language::Arabic,
"hi" => Language::Hindi,
"ko" => Language::Korean,
"bn" => Language::Bengali,
"he" => Language::Hebrew,
"ur" => Language::Urdu,
"fa" => Language::Persian,
"ml" => Language::Malayalam,
"or" => Language::Oriya,
"my" => Language::Burmese,
"ne" => Language::Nepali,
"si" => Language::Sinhalese,
"km" => Language::Khmer,
"tk" => Language::Turkmen,
"am" => Language::Amharic,
"az" => Language::Azerbaijani,
"id" => Language::Indonesian,
"te" => Language::Telugu,
"ta" => Language::Tamil,
"vi" => Language::Vietnamese,
"gu" => Language::Gujarati,
"pa" => Language::Punjabi,
"uz" => Language::Uzbek,
"hy" => Language::Armenian,
"ka" => Language::Georgian,
"la" => Language::Latin,
"sl" => Language::Slovene,
"hr" => Language::Croatian,
"sr" => Language::Serbian,
"mk" => Language::Macedonian,
"lt" => Language::Lithuanian,
"lv" => Language::Latvian,
"et" => Language::Estonian,
"tl" => Language::Tagalog,
"af" => Language::Afrikaans,
"zu" => Language::Zulu,
"sn" => Language::Shona,
"ak" => Language::Akan,
_ => return None,
}
.into()
}
}

54
src/fts/pdf.rs Normal file
View file

@ -0,0 +1,54 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::panic;
use lopdf::Document;
pub fn extract_pdf(bytes: &[u8]) -> Option<String> {
panic::catch_unwind(|| {
let mut buf = Vec::<u8>::new();
let mut out = PlainTextOutput::new(&mut buf as &mut dyn std::io::Write);
output_doc(&Document::load_mem(bytes).ok()?, &mut out).ok()?;
match String::from_utf8(buf) {
Ok(result) => result,
Err(err) => String::from_utf8_lossy(err.as_bytes()).into_owned(),
}
.into()
})
.ok()?
}
/*
#[cfg(test)]
mod tests {
#[test]
fn extract_pdf() {
let bytes = include_bytes!("/tmp/pdf/files/ep.pdf");
let text = super::extract_pdf(bytes);
}
}
*/

277
src/fts/search_snippet.rs Normal file
View file

@ -0,0 +1,277 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use super::term_index::Term;
fn escape_char(c: char, string: &mut String) {
match c {
'&' => string.push_str("&amp;"),
'<' => string.push_str("&lt;"),
'>' => string.push_str("&gt;"),
'"' => string.push_str("&quot;"),
'\n' | '\r' => string.push(' '),
_ => string.push(c),
}
}
fn escape_char_len(c: char) -> usize {
match c {
'&' => "&amp;".len(),
'<' => "&lt;".len(),
'>' => "&gt;".len(),
'"' => "&quot;".len(),
'\r' | '\n' => 1,
_ => c.len_utf8(),
}
}
pub fn generate_snippet(terms: &[Term], text: &str) -> Option<String> {
let mut snippet = String::with_capacity(text.len());
let start_offset = terms.get(0)?.offset as usize;
if start_offset > 0 {
let mut word_count = 0;
let mut from_offset = 0;
let mut last_is_space = false;
if text.len() > 240 {
for (pos, char) in text.get(0..start_offset)?.char_indices().rev() {
// Add up to 2 words or 40 characters of context
if char.is_whitespace() {
if !last_is_space {
word_count += 1;
if word_count == 3 {
break;
}
last_is_space = true;
}
} else {
last_is_space = false;
}
from_offset = pos;
if start_offset - from_offset >= 40 {
break;
}
}
}
last_is_space = false;
for char in text.get(from_offset..start_offset)?.chars() {
if !char.is_whitespace() {
last_is_space = false;
} else {
if last_is_space {
continue;
}
last_is_space = true;
}
escape_char(char, &mut snippet);
}
}
let mut terms = terms.iter().peekable();
'outer: while let Some(term) = terms.next() {
if snippet.len() + ("<mark>".len() * 2) + term.len as usize + 1 > 255 {
break;
}
snippet.push_str("<mark>");
snippet.push_str(text.get(term.offset as usize..term.offset as usize + term.len as usize)?);
snippet.push_str("</mark>");
let next_offset = if let Some(next_term) = terms.peek() {
next_term.offset as usize
} else {
text.len()
};
let mut last_is_space = false;
for char in text
.get(term.offset as usize + term.len as usize..next_offset)?
.chars()
{
if !char.is_whitespace() {
last_is_space = false;
} else {
if last_is_space {
continue;
}
last_is_space = true;
}
if snippet.len() + escape_char_len(char) <= 255 {
escape_char(char, &mut snippet);
} else {
break 'outer;
}
}
}
Some(snippet)
}
#[cfg(test)]
mod tests {
use crate::{
fts::{
term_index::{TermIndex, TermIndexBuilder},
tokenizers::Tokenizer,
Language,
},
Deserialize, Serialize,
};
use super::*;
#[test]
fn search_snippets() {
let inputs = [
(vec![
"Help a friend from Abidjan Côte d'Ivoire",
concat!(
"When my mother died when she was given birth to me, my father took me so ",
"special because I am motherless. Before the death of my late father on 22nd June ",
"2013 in a private hospital here in Abidjan Côte d'Ivoire. He secretly called me on his ",
"bedside and told me that he has a sum of $7.5M (Seven Million five Hundred ",
"Thousand Dollars) left in a suspense account in a local bank here in Abidjan Côte ",
"d'Ivoire, that he used my name as his only daughter for the next of kin in deposit of ",
"the fund. ",
"I am 24year old. Dear I am honorably seeking your assistance in the following ways. ",
"1) To provide any bank account where this money would be transferred into. ",
"2) To serve as the guardian of this fund. ",
"3) To make arrangement for me to come over to your country to further my ",
"education and to secure a residential permit for me in your country. ",
"Moreover, I am willing to offer you 30 percent of the total sum as compensation for ",
"your effort input after the successful transfer of this fund to your nominated ",
"account overseas."
)],
vec![
(
vec!["côte"],
vec![
"Help a friend from Abidjan <mark>Côte</mark> d'Ivoire",
concat!(
"in Abidjan <mark>Côte</mark> d'Ivoire. He secretly called me on his bedside ",
"and told me that he has a sum of $7.5M (Seven Million five Hundred Thousand ",
"Dollars) left in a suspense account in a local bank here in Abidjan ",
"<mark>Côte</mark> d'Ivoire, that ")
]
),
(
vec!["your", "country"],
vec![
concat!(
"honorably seeking <mark>your</mark> assistance in the following ways. ",
"1) To provide any bank account where this money would be transferred into. 2) ",
"To serve as the guardian of this fund. 3) To make arrangement for me to come ",
"over to <mark>your</mark> "
)]
),
(
vec!["overseas"],
vec![
"nominated account <mark>overseas</mark>."
]
),
],
),
(vec![
"孫子兵法",
concat!(
"<\"孫子兵法:\">",
"孫子曰:兵者,國之大事,死生之地,存亡之道,不可不察也。",
"孫子曰:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓客之用,膠漆之材,",
"車甲之奉,日費千金,然後十萬之師舉矣。",
"孫子曰:凡用兵之法,全國為上,破國次之;全旅為上,破旅次之;全卒為上,破卒次之;全伍為上,破伍次之。",
"是故百戰百勝,非善之善者也;不戰而屈人之兵,善之善者也。",
"孫子曰:昔之善戰者,先為不可勝,以待敵之可勝,不可勝在己,可勝在敵。故善戰者,能為不可勝,不能使敵必可勝。",
"故曰:勝可知,而不可為。",
"兵者,詭道也。故能而示之不能,用而示之不用,近而示之遠,遠而示之近。利而誘之,亂而取之,實而備之,強而避之,",
"怒而撓之,卑而驕之,佚而勞之,親而離之。攻其無備,出其不意,此兵家之勝,不可先傳也。",
"夫未戰而廟算勝者,得算多也;未戰而廟算不勝者,得算少也;多算勝,少算不勝,而況於無算乎?吾以此觀之,勝負見矣。",
"孫子曰:凡治眾如治寡,分數是也。鬥眾如鬥寡,形名是也。三軍之眾,可使必受敵而無敗者,奇正是也。兵之所加,",
"如以碬投卵者,虛實是也。",
)],
vec![
(
vec!["孫子兵法"],
vec![
"<mark>孫子兵法</mark>",
concat!(
"&lt;&quot;<mark>孫子兵法</mark>&quot;&gt;孫子曰:兵者,國之大事,死生之地,存亡之道,",
"不可不察也。孫子曰:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓客之用,膠"),
]
),
(
vec!["孫子曰"],
vec![
concat!(
"&lt;&quot;孫子兵法:&quot;&gt;<mark>孫子曰</mark>:兵者,國之大事,死生之地,存亡之道,",
"不可不察也。<mark>孫子曰</mark>:凡用兵之法,馳車千駟,革車千乘,帶甲十萬;千里饋糧,則內外之費賓",
)]
),
],
),
];
for (parts, tests) in inputs {
let mut builder = TermIndexBuilder::new();
for (field_num, part) in parts.iter().enumerate() {
let mut terms = Vec::new();
for token in Tokenizer::new(part, Language::English, 40) {
terms.push(builder.add_token(token));
}
builder.add_terms(field_num as u8, 0, terms);
}
let compressed_term_index = builder.serialize();
let term_index = TermIndex::deserialize(&compressed_term_index[..]).unwrap();
for (match_words, snippets) in tests {
let mut match_terms = Vec::new();
for word in &match_words {
match_terms.push(term_index.get_match_term(word, None));
}
let term_groups = term_index
.match_terms(&match_terms, None, false, true, true)
.unwrap()
.unwrap();
assert_eq!(term_groups.len(), snippets.len());
for (term_group, snippet) in term_groups.iter().zip(snippets.iter()) {
assert_eq!(
snippet,
&generate_snippet(&term_group.terms, parts[term_group.field_id as usize])
.unwrap()
);
}
}
}
}
}

168
src/fts/stemmer.rs Normal file
View file

@ -0,0 +1,168 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::borrow::Cow;
use rust_stemmers::Algorithm;
use super::{tokenizers::Tokenizer, Language};
#[derive(Debug, PartialEq, Eq)]
pub struct StemmedToken<'x> {
pub word: Cow<'x, str>,
pub stemmed_word: Option<Cow<'x, str>>,
pub offset: u32, // Word offset in the text part
pub len: u8, // Word length
}
pub struct Stemmer<'x> {
stemmer: Option<rust_stemmers::Stemmer>,
tokenizer: Tokenizer<'x>,
}
impl<'x> Stemmer<'x> {
pub fn new(text: &'x str, language: Language, max_token_length: usize) -> Stemmer<'x> {
Stemmer {
tokenizer: Tokenizer::new(text, language, max_token_length),
stemmer: STEMMER_MAP[language as usize].map(rust_stemmers::Stemmer::create),
}
}
}
impl<'x> Iterator for Stemmer<'x> {
type Item = StemmedToken<'x>;
fn next(&mut self) -> Option<Self::Item> {
let token = self.tokenizer.next()?;
Some(StemmedToken {
stemmed_word: self.stemmer.as_ref().and_then(|stemmer| {
match stemmer.stem(&token.word) {
Cow::Owned(text) if text.len() != token.len as usize || text != token.word => {
Some(text.into())
}
_ => None,
}
}),
word: token.word,
offset: token.offset,
len: token.len,
})
}
}
static STEMMER_MAP: &[Option<Algorithm>] = &[
None, // Esperanto = 0,
Some(Algorithm::English), // English = 1,
Some(Algorithm::Russian), // Russian = 2,
None, // Mandarin = 3,
Some(Algorithm::Spanish), // Spanish = 4,
Some(Algorithm::Portuguese), // Portuguese = 5,
Some(Algorithm::Italian), // Italian = 6,
None, // Bengali = 7,
Some(Algorithm::French), // French = 8,
Some(Algorithm::German), // German = 9,
None, // Ukrainian = 10,
None, // Georgian = 11,
Some(Algorithm::Arabic), // Arabic = 12,
None, // Hindi = 13,
None, // Japanese = 14,
None, // Hebrew = 15,
None, // Yiddish = 16,
None, // Polish = 17,
None, // Amharic = 18,
None, // Javanese = 19,
None, // Korean = 20,
Some(Algorithm::Norwegian), // Bokmal = 21,
Some(Algorithm::Danish), // Danish = 22,
Some(Algorithm::Swedish), // Swedish = 23,
Some(Algorithm::Finnish), // Finnish = 24,
Some(Algorithm::Turkish), // Turkish = 25,
Some(Algorithm::Dutch), // Dutch = 26,
Some(Algorithm::Hungarian), // Hungarian = 27,
None, // Czech = 28,
Some(Algorithm::Greek), // Greek = 29,
None, // Bulgarian = 30,
None, // Belarusian = 31,
None, // Marathi = 32,
None, // Kannada = 33,
Some(Algorithm::Romanian), // Romanian = 34,
None, // Slovene = 35,
None, // Croatian = 36,
None, // Serbian = 37,
None, // Macedonian = 38,
None, // Lithuanian = 39,
None, // Latvian = 40,
None, // Estonian = 41,
Some(Algorithm::Tamil), // Tamil = 42,
None, // Vietnamese = 43,
None, // Urdu = 44,
None, // Thai = 45,
None, // Gujarati = 46,
None, // Uzbek = 47,
None, // Punjabi = 48,
None, // Azerbaijani = 49,
None, // Indonesian = 50,
None, // Telugu = 51,
None, // Persian = 52,
None, // Malayalam = 53,
None, // Oriya = 54,
None, // Burmese = 55,
None, // Nepali = 56,
None, // Sinhalese = 57,
None, // Khmer = 58,
None, // Turkmen = 59,
None, // Akan = 60,
None, // Zulu = 61,
None, // Shona = 62,
None, // Afrikaans = 63,
None, // Latin = 64,
None, // Slovak = 65,
None, // Catalan = 66,
None, // Tagalog = 67,
None, // Armenian = 68,
None, // Unknown = 69,
];
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn stemmer() {
let inputs = [
(
"love loving lovingly loved lovely",
Language::English,
"love",
),
("querer queremos quer", Language::Spanish, "quer"),
];
for (input, language, result) in inputs {
for token in Stemmer::new(input, language, 40) {
assert_eq!(token.stemmed_word.unwrap_or(token.word), result);
}
}
}
}

995
src/fts/term_index.rs Normal file
View file

@ -0,0 +1,995 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::convert::TryInto;
use crate::{Deserialize, Serialize};
use super::{stemmer::StemmedToken, tokenizers::Token};
use ahash::{AHashMap, AHashSet};
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
use utils::codec::leb128::{Leb128Reader, Leb128Vec};
#[derive(Debug)]
pub enum Error {
DataCorruption,
Leb128DecodeError,
BitpackDecodeError,
InvalidArgument,
}
pub type TermId = u32;
pub type Result<T> = std::result::Result<T, Error>;
const LENGTH_SIZE: usize = std::mem::size_of::<u32>();
#[derive(Debug, PartialEq, Eq)]
pub struct Term {
pub id: TermId,
pub id_stemmed: TermId,
pub offset: u32,
pub len: u8,
}
#[derive(Debug)]
pub struct TermGroup {
pub field_id: u8,
pub part_id: u32,
pub terms: Vec<Term>,
}
#[derive(Debug)]
pub struct TermIndexBuilderItem {
field: u8,
part_id: u32,
terms: Vec<Term>,
}
#[derive(Debug)]
pub struct TermIndexBuilder {
terms: AHashMap<String, u32>,
items: Vec<TermIndexBuilderItem>,
}
#[derive(Debug)]
pub struct TermIndexItem {
pub field_id: u8,
pub part_id: u32,
pub terms_len: usize,
pub terms: Vec<u8>,
}
#[derive(Debug, Default)]
pub struct TermIndex {
pub token_map: AHashMap<String, u32>,
pub items: Vec<TermIndexItem>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct MatchTerm {
pub id: TermId,
pub id_stemmed: TermId,
}
#[derive(Clone, Copy)]
struct TermIndexPacker {
bitpacker_1: BitPacker1x,
bitpacker_4: BitPacker4x,
bitpacker_8: BitPacker8x,
block_len: usize,
}
impl TermIndexPacker {
pub fn with_block_len(block_len: usize) -> Self {
TermIndexPacker {
bitpacker_1: BitPacker1x::new(),
bitpacker_4: BitPacker4x::new(),
bitpacker_8: BitPacker8x::new(),
block_len,
}
}
pub fn block_len(&mut self, num: usize) {
self.block_len = num;
}
}
impl BitPacker for TermIndexPacker {
const BLOCK_LEN: usize = 0;
fn new() -> Self {
TermIndexPacker {
bitpacker_1: BitPacker1x::new(),
bitpacker_4: BitPacker4x::new(),
bitpacker_8: BitPacker8x::new(),
block_len: 1,
}
}
fn compress(&self, decompressed: &[u32], compressed: &mut [u8], num_bits: u8) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => self
.bitpacker_8
.compress(decompressed, compressed, num_bits),
BitPacker4x::BLOCK_LEN => self
.bitpacker_4
.compress(decompressed, compressed, num_bits),
_ => self
.bitpacker_1
.compress(decompressed, compressed, num_bits),
}
}
fn compress_sorted(
&self,
initial: u32,
decompressed: &[u32],
compressed: &mut [u8],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.compress_sorted(initial, decompressed, compressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.compress_sorted(initial, decompressed, compressed, num_bits)
}
_ => self
.bitpacker_1
.compress_sorted(initial, decompressed, compressed, num_bits),
}
}
fn decompress(&self, compressed: &[u8], decompressed: &mut [u32], num_bits: u8) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.decompress(compressed, decompressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.decompress(compressed, decompressed, num_bits)
}
_ => self
.bitpacker_1
.decompress(compressed, decompressed, num_bits),
}
}
fn decompress_sorted(
&self,
initial: u32,
compressed: &[u8],
decompressed: &mut [u32],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.decompress_sorted(initial, compressed, decompressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.decompress_sorted(initial, compressed, decompressed, num_bits)
}
_ => self
.bitpacker_1
.decompress_sorted(initial, compressed, decompressed, num_bits),
}
}
fn num_bits(&self, decompressed: &[u32]) -> u8 {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits(decompressed),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits(decompressed),
_ => self.bitpacker_1.num_bits(decompressed),
}
}
fn num_bits_sorted(&self, initial: u32, decompressed: &[u32]) -> u8 {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits_sorted(initial, decompressed),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits_sorted(initial, decompressed),
_ => self.bitpacker_1.num_bits_sorted(initial, decompressed),
}
}
}
#[allow(clippy::new_without_default)]
impl TermIndexBuilder {
pub fn new() -> TermIndexBuilder {
TermIndexBuilder {
items: Vec::new(),
terms: AHashMap::default(),
}
}
pub fn add_token(&mut self, token: Token) -> Term {
let id = self.terms.len() as u32;
let id = self
.terms
.entry(token.word.into_owned())
.or_insert_with(|| id);
Term {
id: *id,
id_stemmed: *id,
offset: token.offset,
len: token.len,
}
}
pub fn add_stemmed_token(&mut self, token: StemmedToken) -> Term {
let id = self.terms.len() as u32;
let id = *self
.terms
.entry(token.word.into_owned())
.or_insert_with(|| id);
let id_stemmed = if let Some(stemmed_word) = token.stemmed_word {
let id_stemmed = self.terms.len() as u32;
*self
.terms
.entry(stemmed_word.into_owned())
.or_insert_with(|| id_stemmed)
} else {
id
};
Term {
id,
id_stemmed,
offset: token.offset,
len: token.len,
}
}
pub fn add_terms(&mut self, field: u8, part_id: u32, terms: Vec<Term>) {
self.items.push(TermIndexBuilderItem {
field,
part_id,
terms,
});
}
pub fn is_empty(&self) -> bool {
self.items.is_empty()
}
}
impl Serialize for TermIndexBuilder {
fn serialize(self) -> Vec<u8> {
// Add tokens
if self.terms.is_empty() {
return Vec::new();
}
let mut terms = vec![""; self.terms.len()];
let mut terms_len = 0;
for (word, id) in &self.terms {
terms[*id as usize] = word;
terms_len += word.len() + 1;
}
// Serialize tokens
let mut bytes = Vec::with_capacity(
terms_len + ((self.items.len() / self.terms.len()) * std::mem::size_of::<u64>() * 2),
);
bytes.push_leb128(self.terms.len());
for terms in terms {
bytes.extend_from_slice(terms.as_bytes());
bytes.push(0);
}
// Write terms
let mut bitpacker = TermIndexPacker::new();
let mut compressed = vec![0u8; 4 * BitPacker8x::BLOCK_LEN];
for term_index in &self.items {
let mut ids = Vec::with_capacity(term_index.terms.len() * 2);
let mut offsets = Vec::with_capacity(term_index.terms.len());
let mut lengths = Vec::with_capacity(term_index.terms.len());
let header_pos = bytes.len();
bytes.extend_from_slice(&[0u8; LENGTH_SIZE]);
bytes.push(term_index.field);
bytes.push_leb128(term_index.part_id);
bytes.push_leb128(term_index.terms.len());
let terms_pos = bytes.len();
for term in &term_index.terms {
ids.push(term.id);
ids.push(term.id_stemmed);
offsets.push(term.offset);
lengths.push(term.len);
}
for (chunk, is_sorted) in [(ids, false), (offsets, true)] {
let mut pos = 0;
let len = chunk.len();
let mut initial_value = 0;
while pos < len {
let block_len = match len - pos {
0..=31 => 0,
32..=127 => BitPacker1x::BLOCK_LEN,
128..=255 => BitPacker4x::BLOCK_LEN,
_ => BitPacker8x::BLOCK_LEN,
};
if block_len > 0 {
let chunk = &chunk[pos..pos + block_len];
bitpacker.block_len(block_len);
if is_sorted {
let num_bits: u8 = bitpacker.num_bits_sorted(initial_value, chunk);
let compressed_len = bitpacker.compress_sorted(
initial_value,
chunk,
&mut compressed[..],
num_bits,
);
bytes.push(num_bits);
bytes.extend_from_slice(&compressed[..compressed_len]);
initial_value = chunk[chunk.len() - 1];
} else {
let num_bits: u8 = bitpacker.num_bits(chunk);
let compressed_len =
bitpacker.compress(chunk, &mut compressed[..], num_bits);
bytes.push(num_bits);
bytes.extend_from_slice(&compressed[..compressed_len]);
}
pos += block_len;
} else {
for val in &chunk[pos..] {
bytes.push_leb128(*val);
}
pos = len;
}
}
}
bytes.append(&mut lengths);
let len = (bytes.len() - terms_pos) as u32;
bytes[header_pos..header_pos + LENGTH_SIZE].copy_from_slice(&len.to_le_bytes());
}
bytes
}
}
impl Deserialize for TermIndex {
fn deserialize(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128()?;
let mut token_map = AHashMap::with_capacity(num_tokens as usize);
for term_id in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
token_map.insert(
String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?,
term_id,
);
pos += nil_pos + 1;
}
let mut term_index = TermIndex {
items: Vec::new(),
token_map,
};
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize;
pos += LENGTH_SIZE;
let field = bytes.get(pos)?;
pos += 1;
let (part_id, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128()?;
pos += bytes_read;
term_index.items.push(TermIndexItem {
field_id: *field,
part_id,
terms_len,
terms: bytes.get(pos..pos + item_len)?.to_vec(),
});
pos += item_len;
}
Some(term_index)
}
}
impl TermIndex {
pub fn get_match_term(&self, word: &str, stemmed_word: Option<&str>) -> MatchTerm {
let id = self.token_map.get(word).copied().unwrap_or(u32::MAX);
let id_stemmed = stemmed_word
.and_then(|word| self.token_map.get(word))
.copied()
.unwrap_or(id);
MatchTerm { id, id_stemmed }
}
fn skip_items(&self, bytes: &[u8], mut remaining_items: usize) -> Result<usize> {
let mut pos = 0;
while remaining_items > 0 {
let block_len = match remaining_items {
0..=31 => 0,
32..=127 => BitPacker1x::BLOCK_LEN,
128..=255 => BitPacker4x::BLOCK_LEN,
_ => BitPacker8x::BLOCK_LEN,
};
if block_len > 0 {
pos +=
((*bytes.get(pos).ok_or(Error::DataCorruption)? as usize) * block_len / 8) + 1;
remaining_items -= block_len;
} else {
while remaining_items > 0 {
let bytes_read = bytes
.get(pos..)
.ok_or(Error::DataCorruption)?
.skip_leb128()
.ok_or(Error::Leb128DecodeError)?;
pos += bytes_read;
remaining_items -= 1;
}
}
}
Ok(pos)
}
fn uncompress_chunk(
bytes: &[u8],
remaining_items: usize,
initial_value: Option<u32>,
) -> Result<(usize, Vec<u32>)> {
let block_len = match remaining_items {
0..=31 => 0,
32..=127 => BitPacker1x::BLOCK_LEN,
128..=255 => BitPacker4x::BLOCK_LEN,
_ => BitPacker8x::BLOCK_LEN,
};
if block_len > 0 {
let bitpacker = TermIndexPacker::with_block_len(block_len);
let num_bits = *bytes.first().ok_or(Error::DataCorruption)?;
let bytes_read = ((num_bits as usize) * block_len / 8) + 1;
let mut decompressed = vec![0u32; block_len];
if let Some(initial_value) = initial_value {
bitpacker.decompress_sorted(
initial_value,
bytes.get(1..bytes_read).ok_or(Error::DataCorruption)?,
&mut decompressed[..],
num_bits,
);
} else {
bitpacker.decompress(
bytes.get(1..bytes_read).ok_or(Error::DataCorruption)?,
&mut decompressed[..],
num_bits,
);
}
Ok((bytes_read, decompressed))
} else {
let mut decompressed = Vec::with_capacity(remaining_items);
let mut pos = 0;
while decompressed.len() < remaining_items {
let (val, bytes_read) = bytes
.get(pos..)
.ok_or(Error::DataCorruption)?
.read_leb128()
.ok_or(Error::Leb128DecodeError)?;
decompressed.push(val);
pos += bytes_read;
}
Ok((pos, decompressed))
}
}
pub fn match_terms(
&self,
match_terms: &[MatchTerm],
match_in: Option<AHashSet<u8>>,
match_phrase: bool,
match_many: bool,
include_offsets: bool,
) -> Result<Option<Vec<TermGroup>>> {
let mut result = Vec::new();
// Safety check to avoid overflowing the bit mask
if !match_phrase && !(1..=64).contains(&match_terms.len()) {
return Err(Error::InvalidArgument);
}
// Term matching is done using a bit mask, where each bit represents a word.
// Each time a word is matched, the corresponding bit is cleared.
// When all bits are cleared, all matching terms are added to the result list.
let words_mask: u64 = u64::MAX >> (64 - match_terms.len());
let mut matched_mask = words_mask;
for item in &self.items {
if let Some(ref match_in) = match_in {
if !match_in.contains(&item.field_id) {
continue;
}
}
let mut terms = Vec::new();
let mut partial_match = Vec::new();
let mut term_pos = 0;
let mut byte_pos = 0;
'term_loop: while term_pos < item.terms_len {
let (bytes_read, chunk) = TermIndex::uncompress_chunk(
item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?,
(item.terms_len * 2) - (term_pos * 2),
None,
)?;
byte_pos += bytes_read;
for encoded_term in chunk.chunks_exact(2) {
let term_id = encoded_term[0];
let term_id_stemmed = encoded_term[1];
if match_phrase {
let match_pos = partial_match.len();
if match_terms[match_pos].id == term_id {
partial_match.push(Term {
id: term_id,
id_stemmed: term_id_stemmed,
offset: term_pos as u32,
len: 0,
});
if partial_match.len() == match_terms.len() {
terms.append(&mut partial_match);
if !match_many {
break 'term_loop;
}
}
} else if match_pos > 0 {
partial_match.clear();
}
} else {
'match_loop: for (match_pos, match_term) in match_terms.iter().enumerate() {
if match_term.id == term_id
|| match_term.id == term_id_stemmed
|| ((match_term.id_stemmed != match_term.id)
&& (match_term.id_stemmed == term_id
|| match_term.id_stemmed == term_id_stemmed))
{
partial_match.push(Term {
id: term_id,
id_stemmed: term_id_stemmed,
offset: term_pos as u32,
len: 0,
});
// Clear the bit corresponding to the matched term
matched_mask &= !(1 << match_pos);
break 'match_loop;
}
}
if !match_many && matched_mask == 0 {
break 'term_loop;
}
}
term_pos += 1;
}
}
if !match_phrase && !partial_match.is_empty() {
terms.append(&mut partial_match);
}
if !terms.is_empty() {
if include_offsets {
// Skip any term ids that were not uncompressed
if term_pos < item.terms_len {
byte_pos += self.skip_items(
item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?,
(item.terms_len - term_pos) * 2,
)?;
}
// Uncompress offsets
let mut term_it = terms.iter_mut();
let mut term = term_it.next().unwrap();
let mut initial_value = 0;
term_pos = 0;
'outer: while term_pos < item.terms_len {
let (bytes_read, chunk) = TermIndex::uncompress_chunk(
item.terms.get(byte_pos..).ok_or(Error::DataCorruption)?,
item.terms_len - term_pos,
Some(initial_value),
)?;
initial_value = chunk[chunk.len() - 1];
byte_pos += bytes_read;
for offset in chunk.into_iter() {
if term.offset == term_pos as u32 {
term.len = *item
.terms
.get(item.terms.len() - item.terms_len + term.offset as usize)
.ok_or(Error::DataCorruption)?;
term.offset = offset;
if let Some(next_term) = term_it.next() {
term = next_term;
} else {
break 'outer;
}
}
term_pos += 1;
}
}
}
result.push(TermGroup {
field_id: item.field_id,
part_id: item.part_id,
terms,
});
if !match_many {
break;
}
}
}
Ok(if !result.is_empty() {
Some(result)
} else {
None
})
}
}
#[derive(Default)]
pub struct Terms {
pub field_id: u8,
pub exact_terms: AHashSet<TermId>,
pub stemmed_terms: AHashSet<TermId>,
}
pub struct TokenIndex {
pub tokens: Vec<String>,
pub terms: Vec<Terms>,
}
impl Deserialize for TokenIndex {
fn deserialize(bytes: &[u8]) -> Option<Self> {
let (num_tokens, mut pos) = bytes.read_leb128::<u32>()?;
let mut tokens = Vec::with_capacity(num_tokens as usize);
for _ in 0..num_tokens {
let nil_pos = bytes.get(pos..)?.iter().position(|b| b == &0)?;
tokens.push(String::from_utf8(bytes.get(pos..pos + nil_pos)?.to_vec()).ok()?);
pos += nil_pos + 1;
}
let mut terms = Vec::new();
while pos < bytes.len() {
let item_len =
u32::from_le_bytes(bytes.get(pos..pos + LENGTH_SIZE)?.try_into().ok()?) as usize;
pos += LENGTH_SIZE;
let mut field_terms = Terms {
field_id: *bytes.get(pos)?,
exact_terms: AHashSet::default(),
stemmed_terms: AHashSet::default(),
};
pos += 1;
let bytes_read = bytes.get(pos..)?.skip_leb128()?;
pos += bytes_read;
let (terms_len, bytes_read) = bytes.get(pos..)?.read_leb128::<usize>()?;
pos += bytes_read;
let mut term_pos = 0;
let mut byte_pos = pos;
while term_pos < terms_len {
let (bytes_read, chunk) = TermIndex::uncompress_chunk(
bytes.get(byte_pos..)?,
(terms_len - term_pos) * 2,
None,
)
.ok()?;
byte_pos += bytes_read;
for encoded_term in chunk.chunks_exact(2) {
let term_id = encoded_term[0];
let term_id_stemmed = encoded_term[1];
field_terms.exact_terms.insert(term_id);
if term_id != term_id_stemmed {
field_terms.stemmed_terms.insert(term_id_stemmed);
}
term_pos += 1;
}
}
terms.push(field_terms);
pos += item_len;
}
Some(TokenIndex { tokens, terms })
}
}
#[cfg(test)]
mod tests {
use ahash::{AHashMap, AHashSet};
use crate::{
fts::{
stemmer::Stemmer,
term_index::{TermIndexBuilder, TokenIndex},
Language,
},
Deserialize, Serialize,
};
use super::TermIndex;
#[test]
#[allow(clippy::bind_instead_of_map)]
fn term_index() {
const SUBJECT: u8 = 1;
const BODY: u8 = 2;
const ATTACHMENT: u8 = 3;
let parts = [
(
r#"I felt happy because I saw the others were happy
and because I knew I should feel happy, but I wasnt
really happy."#,
SUBJECT,
),
(
r#"But good morning! Good morning to ye and thou! Id
say to all my patients, because I was the worse of the
hypocrites, of all the hypocrites, the cruel and phony
hypocrites, I was the very worst."#,
BODY,
),
(
r#"So I said yes to Thomas Clinton and later thought
that I had said yes to God and later still realized I
had said yes only to Thomas Clinton."#,
BODY,
),
(
r#"Even if they are djinns, I will get djinns that
can outdjinn them."#,
BODY,
),
(
r#"Hatred was spreading everywhere, blood was being
spilled everywhere, wars were breaking out
everywhere."#,
BODY,
),
(
r#"Almost nothing was more annoying than having
our wasted time wasted on something not worth
wasting it on."#,
BODY,
),
(
r#"The depressed person was in terrible and unceasing
emotional pain, and the impossibility of sharing or
articulating this pain was itself a component of the
pain and a contributing factor in its essential horror."#,
BODY,
),
(
r#"Paranoids are not paranoid because theyre paranoid,
but because they keep putting themselves, darn idiots,
deliberately into paranoid situations."#,
BODY,
),
(
r#"Because the world is a place of silence, the sky at
night when the birds have gone is a vast silent place."#,
BODY,
),
(
r#"There are some things that are so unforgivable that
they make other things easily forgivable."#,
BODY,
),
(
r#"I had known loneliness before, and emptiness upon the
moor, but I had never been a NOTHING, a nothing floating
on a nothing, known by nothing, lonelier and colder than
the space between the stars."#,
ATTACHMENT,
),
(
r#"Youre an insomniac, you tell yourself: there are
profound truths revealed only to the insomniac by night
like those phosphorescent minerals veined and glimmering
in the dark but coarse and ordinary otherwise; you have
to examine such minerals in the absence of light to
discover their beauty, you tell yourself."#,
ATTACHMENT,
),
(
r#"Every person had a star, every star had a friend,
and for every person carrying a star there was someone
else who reflected it, and everyone carried this reflection
like a secret confidante in the heart."#,
ATTACHMENT,
),
(
r#"As my grandfather went, arm over arm, his heart making
sour little shudders against his ribs, he kept listening
for a sound, the sound of the tiger, the sound of anything
but his own feet and lungs."#,
ATTACHMENT,
),
(r#"love loving lovingly loved lovely"#, ATTACHMENT),
];
let mut builder = TermIndexBuilder::new();
let mut stemmed_word_ids = AHashMap::default();
// Build the term index
for (part_id, (text, field_id)) in parts.iter().enumerate() {
let mut terms = Vec::new();
for token in Stemmer::new(text, Language::English, 40) {
let stemmed_word = if token.stemmed_word.is_some() {
token.stemmed_word.clone()
} else {
None
};
let term = builder.add_stemmed_token(token);
if let Some(stemmed_word) = stemmed_word {
stemmed_word_ids.insert(term.id_stemmed, stemmed_word.into_owned());
}
terms.push(term);
}
builder.add_terms(*field_id, part_id as u32, terms);
}
let compressed_term_index = builder.serialize();
let term_index = TermIndex::deserialize(&compressed_term_index[..]).unwrap();
assert_eq!(
15,
TokenIndex::deserialize(&compressed_term_index[..])
.unwrap()
.terms
.len()
);
for (words, field_id, match_phrase, match_count) in [
(vec!["thomas", "clinton"], None, true, 4),
(vec!["was", "the", "worse"], None, true, 3),
(vec!["carri"], None, false, 2),
(vec!["nothing", "floating"], None, true, 2),
(vec!["floating", "nothing"], None, false, 6),
(vec!["floating", "nothing"], None, true, 0),
(vec!["noth", "floating"], None, true, 0),
(vec!["noth", "floating"], None, false, 6),
(vec!["realli", "happi"], None, false, 5),
(vec!["really", "happy"], None, true, 2),
(vec!["should", "feel", "happy", "but"], None, true, 4),
(
vec!["love", "loving", "lovingly", "loved", "lovely"],
Some(ATTACHMENT),
true,
5,
),
(vec!["love"], Some(ATTACHMENT), false, 5),
(vec!["but"], None, false, 6),
(vec!["but"], None, true, 6),
] {
let mut match_terms = Vec::new();
for word in &words {
let stemmed_token = Stemmer::new(word, Language::English, 40)
.next()
.and_then(|w| w.stemmed_word);
match_terms.push(
term_index.get_match_term(word, stemmed_token.as_ref().map(|w| w.as_ref())),
);
}
let result = term_index
.match_terms(
&match_terms,
field_id.and_then(|f| {
let mut h = AHashSet::default();
h.insert(f);
Some(h)
}),
match_phrase,
true,
true,
)
.unwrap()
.unwrap_or_default();
let mut result_len = 0;
for r in &result {
result_len += r.terms.len();
}
if result_len != match_count {
for term_group in result {
let part = &parts[term_group.part_id as usize].0;
println!("-> Part id {}", term_group.part_id);
for term in term_group.terms {
println!(
"[{}] ",
&part[term.offset as usize..term.offset as usize + term.len as usize]
);
}
}
panic!(
"Expected {}, got {} for words {:?}, match phrase {:?}.",
match_count, result_len, words, match_phrase
);
}
for term_group in &result {
'outer: for term in &term_group.terms {
let text_word = parts[term_group.part_id as usize].0
[term.offset as usize..term.offset as usize + term.len as usize]
.to_lowercase();
let token_stemmed_word = if term.id_stemmed != term.id {
stemmed_word_ids.get(&term.id_stemmed)
} else {
None
};
for word in words.iter() {
if word == &text_word
|| !match_phrase
&& word == token_stemmed_word.unwrap_or(&"".to_string())
{
continue 'outer;
}
}
panic!("({:?}, {}) != {:?}", words, match_phrase, result);
}
}
}
}
}

View file

@ -0,0 +1,197 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{borrow::Cow, vec::IntoIter};
use jieba_rs::Jieba;
use super::{word::WordTokenizer, Token};
use lazy_static::lazy_static;
lazy_static! {
static ref JIEBA: Jieba = Jieba::new();
}
pub struct ChineseTokenizer<'x> {
word_tokenizer: WordTokenizer<'x>,
tokens: IntoIter<&'x str>,
token_offset: usize,
token_len: usize,
token_len_cur: usize,
max_token_length: usize,
}
impl<'x> ChineseTokenizer<'x> {
pub fn new(text: &str, max_token_length: usize) -> ChineseTokenizer {
ChineseTokenizer {
word_tokenizer: WordTokenizer::new(text),
tokens: Vec::new().into_iter(),
max_token_length,
token_offset: 0,
token_len: 0,
token_len_cur: 0,
}
}
}
impl<'x> Iterator for ChineseTokenizer<'x> {
type Item = Token<'x>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(ch_token) = self.tokens.next() {
let offset_start = self.token_offset + self.token_len_cur;
self.token_len_cur += ch_token.len();
if ch_token.len() <= self.max_token_length {
return Token::new(offset_start, ch_token.len(), ch_token.into()).into();
}
} else {
loop {
let (token, is_ascii) = self.word_tokenizer.next()?;
if !is_ascii {
let word = match token.word {
Cow::Borrowed(word) => word,
Cow::Owned(_) => unreachable!(),
};
self.tokens = JIEBA.cut(word, false).into_iter();
self.token_offset = token.offset as usize;
self.token_len = token.len as usize;
self.token_len_cur = 0;
break;
} else if token.len as usize <= self.max_token_length {
return token.into();
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn chinese_tokenizer() {
assert_eq!(
ChineseTokenizer::new(
"孫子曰:兵者,國之大事,死生之地,存亡之道,不可不察也。",
40
)
.collect::<Vec<_>>(),
vec![
Token {
word: "".into(),
offset: 0,
len: 3
},
Token {
word: "".into(),
offset: 3,
len: 3
},
Token {
word: "".into(),
offset: 6,
len: 3
},
Token {
word: "".into(),
offset: 12,
len: 3
},
Token {
word: "".into(),
offset: 15,
len: 3
},
Token {
word: "".into(),
offset: 21,
len: 3
},
Token {
word: "".into(),
offset: 24,
len: 3
},
Token {
word: "大事".into(),
offset: 27,
len: 6
},
Token {
word: "".into(),
offset: 36,
len: 3
},
Token {
word: "".into(),
offset: 39,
len: 3
},
Token {
word: "".into(),
offset: 42,
len: 3
},
Token {
word: "".into(),
offset: 45,
len: 3
},
Token {
word: "存亡".into(),
offset: 51,
len: 6
},
Token {
word: "".into(),
offset: 57,
len: 3
},
Token {
word: "".into(),
offset: 60,
len: 3
},
Token {
word: "不可不".into(),
offset: 66,
len: 9
},
Token {
word: "".into(),
offset: 75,
len: 3
},
Token {
word: "".into(),
offset: 78,
len: 3
}
]
);
}
}

View file

@ -0,0 +1,167 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::str::CharIndices;
use super::Token;
pub struct IndoEuropeanTokenizer<'x> {
max_token_length: usize,
text: &'x str,
iterator: CharIndices<'x>,
}
impl<'x> IndoEuropeanTokenizer<'x> {
pub fn new(text: &str, max_token_length: usize) -> IndoEuropeanTokenizer {
IndoEuropeanTokenizer {
max_token_length,
text,
iterator: text.char_indices(),
}
}
}
/// Parses indo-european text into lowercase tokens.
impl<'x> Iterator for IndoEuropeanTokenizer<'x> {
type Item = Token<'x>;
fn next(&mut self) -> Option<Self::Item> {
while let Some((token_start, ch)) = self.iterator.next() {
if ch.is_alphanumeric() {
let mut is_uppercase = ch.is_uppercase();
let token_end = (&mut self.iterator)
.filter_map(|(pos, ch)| {
if ch.is_alphanumeric() {
if !is_uppercase && ch.is_uppercase() {
is_uppercase = true;
}
None
} else {
pos.into()
}
})
.next()
.unwrap_or(self.text.len());
let token_len = token_end - token_start;
if token_end > token_start && token_len <= self.max_token_length {
return Token::new(
token_start,
token_len,
if is_uppercase {
self.text[token_start..token_end].to_lowercase().into()
} else {
self.text[token_start..token_end].into()
},
)
.into();
}
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn indo_european_tokenizer() {
let inputs = [
(
"The quick brown fox jumps over the lazy dog",
vec![
Token::new(0, 3, "the".into()),
Token::new(4, 5, "quick".into()),
Token::new(10, 5, "brown".into()),
Token::new(16, 3, "fox".into()),
Token::new(20, 5, "jumps".into()),
Token::new(26, 4, "over".into()),
Token::new(31, 3, "the".into()),
Token::new(35, 4, "lazy".into()),
Token::new(40, 3, "dog".into()),
],
),
(
"Jovencillo EMPONZOÑADO de whisky: ¡qué figurota exhibe!",
vec![
Token::new(0, 10, "jovencillo".into()),
Token::new(11, 12, "emponzoñado".into()),
Token::new(24, 2, "de".into()),
Token::new(27, 6, "whisky".into()),
Token::new(37, 4, "qué".into()),
Token::new(42, 8, "figurota".into()),
Token::new(51, 6, "exhibe".into()),
],
),
(
"ZWÖLF Boxkämpfer jagten Victor quer über den großen Sylter Deich",
vec![
Token::new(0, 6, "zwölf".into()),
Token::new(7, 11, "boxkämpfer".into()),
Token::new(19, 6, "jagten".into()),
Token::new(26, 6, "victor".into()),
Token::new(33, 4, "quer".into()),
Token::new(38, 5, "über".into()),
Token::new(44, 3, "den".into()),
Token::new(48, 7, "großen".into()),
Token::new(56, 6, "sylter".into()),
Token::new(63, 5, "deich".into()),
],
),
(
"Съешь ещё этих мягких французских булок, да выпей же чаю",
vec![
Token::new(0, 10, "съешь".into()),
Token::new(11, 6, "ещё".into()),
Token::new(18, 8, "этих".into()),
Token::new(27, 12, "мягких".into()),
Token::new(40, 22, "французских".into()),
Token::new(63, 10, "булок".into()),
Token::new(75, 4, "да".into()),
Token::new(80, 10, "выпей".into()),
Token::new(91, 4, "же".into()),
Token::new(96, 6, "чаю".into()),
],
),
(
"Pijamalı hasta yağız şoföre çabucak güvendi",
vec![
Token::new(0, 9, "pijamalı".into()),
Token::new(10, 5, "hasta".into()),
Token::new(16, 7, "yağız".into()),
Token::new(24, 8, "şoföre".into()),
Token::new(33, 8, "çabucak".into()),
Token::new(42, 8, "güvendi".into()),
],
),
];
for (input, tokens) in inputs.iter() {
for (pos, token) in IndoEuropeanTokenizer::new(input, 40).enumerate() {
assert_eq!(token, tokens[pos]);
}
}
}
}

View file

@ -0,0 +1,168 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::vec::IntoIter;
use super::{word::WordTokenizer, Token};
pub struct JapaneseTokenizer<'x> {
word_tokenizer: WordTokenizer<'x>,
tokens: IntoIter<String>,
token_offset: usize,
token_len: usize,
token_len_cur: usize,
max_token_length: usize,
}
impl<'x> JapaneseTokenizer<'x> {
pub fn new(text: &str, max_token_length: usize) -> JapaneseTokenizer {
JapaneseTokenizer {
word_tokenizer: WordTokenizer::new(text),
tokens: Vec::new().into_iter(),
max_token_length,
token_offset: 0,
token_len: 0,
token_len_cur: 0,
}
}
}
impl<'x> Iterator for JapaneseTokenizer<'x> {
type Item = Token<'x>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(jp_token) = self.tokens.next() {
let offset_start = self.token_offset + self.token_len_cur;
self.token_len_cur += jp_token.len();
if jp_token.len() <= self.max_token_length {
return Token::new(offset_start, jp_token.len(), jp_token.into()).into();
}
} else {
loop {
let (token, is_ascii) = self.word_tokenizer.next()?;
if !is_ascii {
self.tokens = tinysegmenter::tokenize(token.word.as_ref()).into_iter();
self.token_offset = token.offset as usize;
self.token_len = token.len as usize;
self.token_len_cur = 0;
break;
} else if token.len as usize <= self.max_token_length {
return token.into();
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn japanese_tokenizer() {
assert_eq!(
JapaneseTokenizer::new("お先に失礼します あなたの名前は何ですか 123 abc-872", 40)
.collect::<Vec<_>>(),
vec![
Token {
word: "お先".into(),
offset: 0,
len: 6
},
Token {
word: "".into(),
offset: 6,
len: 3
},
Token {
word: "失礼".into(),
offset: 9,
len: 6
},
Token {
word: "".into(),
offset: 15,
len: 3
},
Token {
word: "ます".into(),
offset: 18,
len: 6
},
Token {
word: "あなた".into(),
offset: 25,
len: 9
},
Token {
word: "".into(),
offset: 34,
len: 3
},
Token {
word: "名前".into(),
offset: 37,
len: 6
},
Token {
word: "".into(),
offset: 43,
len: 3
},
Token {
word: "".into(),
offset: 46,
len: 3
},
Token {
word: "です".into(),
offset: 49,
len: 6
},
Token {
word: "".into(),
offset: 55,
len: 3
},
Token {
word: "123".into(),
offset: 59,
len: 3
},
Token {
word: "abc".into(),
offset: 63,
len: 3
},
Token {
word: "872".into(),
offset: 67,
len: 3
}
]
);
}
}

95
src/fts/tokenizers/mod.rs Normal file
View file

@ -0,0 +1,95 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
pub mod chinese;
pub mod indo_european;
pub mod japanese;
pub mod word;
use std::borrow::Cow;
use self::{
chinese::ChineseTokenizer, indo_european::IndoEuropeanTokenizer, japanese::JapaneseTokenizer,
};
use super::Language;
#[derive(Debug, PartialEq, Eq)]
pub struct Token<'x> {
pub word: Cow<'x, str>,
pub offset: u32, // Word offset in the text part
pub len: u8, // Word length
}
impl<'x> Token<'x> {
pub fn new(offset: usize, len: usize, word: Cow<'x, str>) -> Token<'x> {
debug_assert!(offset <= u32::max_value() as usize);
debug_assert!(len <= u8::max_value() as usize);
Token {
offset: offset as u32,
len: len as u8,
word,
}
}
}
enum LanguageTokenizer<'x> {
IndoEuropean(IndoEuropeanTokenizer<'x>),
Japanese(JapaneseTokenizer<'x>),
Chinese(ChineseTokenizer<'x>),
}
pub struct Tokenizer<'x> {
tokenizer: LanguageTokenizer<'x>,
}
impl<'x> Tokenizer<'x> {
pub fn new(text: &'x str, language: Language, max_token_length: usize) -> Self {
Tokenizer {
tokenizer: match language {
Language::Japanese => {
LanguageTokenizer::Japanese(JapaneseTokenizer::new(text, max_token_length))
}
Language::Mandarin => {
LanguageTokenizer::Chinese(ChineseTokenizer::new(text, max_token_length))
}
_ => LanguageTokenizer::IndoEuropean(IndoEuropeanTokenizer::new(
text,
max_token_length,
)),
},
}
}
}
impl<'x> Iterator for Tokenizer<'x> {
type Item = Token<'x>;
fn next(&mut self) -> Option<Self::Item> {
match &mut self.tokenizer {
LanguageTokenizer::IndoEuropean(tokenizer) => tokenizer.next(),
LanguageTokenizer::Chinese(tokenizer) => tokenizer.next(),
LanguageTokenizer::Japanese(tokenizer) => tokenizer.next(),
}
}
}

View file

@ -0,0 +1,80 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::str::CharIndices;
use super::Token;
pub struct WordTokenizer<'x> {
text: &'x str,
iterator: CharIndices<'x>,
}
impl<'x> WordTokenizer<'x> {
pub fn new(text: &str) -> WordTokenizer {
WordTokenizer {
text,
iterator: text.char_indices(),
}
}
}
/// Parses text into tokens, used by non-IndoEuropean tokenizers.
impl<'x> Iterator for WordTokenizer<'x> {
type Item = (Token<'x>, bool);
fn next(&mut self) -> Option<Self::Item> {
let mut is_ascii = true;
while let Some((token_start, ch)) = self.iterator.next() {
if ch.is_alphanumeric() {
let token_end = (&mut self.iterator)
.filter_map(|(pos, ch)| {
if ch.is_alphanumeric() {
if is_ascii && !ch.is_ascii() {
is_ascii = false;
}
None
} else {
pos.into()
}
})
.next()
.unwrap_or(self.text.len());
let token_len = token_end - token_start;
if token_end > token_start {
return (
Token::new(
token_start,
token_len,
self.text[token_start..token_end].into(),
),
is_ascii,
)
.into();
}
}
}
None
}
}

91
src/lib.rs Normal file
View file

@ -0,0 +1,91 @@
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
pub mod backend;
pub mod fts;
pub mod query;
pub mod write;
pub struct Store {
db: OptimisticTransactionDB<MultiThreaded>,
}
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> Option<Self>;
}
pub trait Serialize {
fn serialize(self) -> Vec<u8>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BitmapKey<'x> {
pub account_id: u32,
pub collection: u8,
pub family: u8,
pub field: u8,
pub key: &'x [u8],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IndexKey<'x> {
pub account_id: u32,
pub collection: u8,
pub field: u8,
pub key: &'x [u8],
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ValueKey {
pub account_id: u32,
pub collection: u8,
pub document_id: u32,
pub field: u8,
}
pub type Result<T> = std::result::Result<T, Error>;
pub enum Error {
NotFound,
InternalError(String),
}
pub const BM_DOCUMENT_IDS: u8 = 0;
pub const BM_TERM: u8 = 0x10;
pub const BM_TAG: u8 = 0x20;
pub const TERM_EXACT: u8 = 0x00;
pub const TERM_STEMMED: u8 = 0x01;
pub const TERM_STRING: u8 = 0x02;
pub const TERM_HASH: u8 = 0x04;
pub const TAG_ID: u8 = 0x00;
pub const TAG_TEXT: u8 = 0x01;
pub const TAG_STATIC: u8 = 0x02;
#[cfg(test)]
mod tests {
use rand::Rng;
use roaring::RoaringBitmap;
use super::*;
#[test]
fn it_works() {
let mut rb1 = RoaringBitmap::new();
let mut rb2 = RoaringBitmap::new();
let total = rand::thread_rng().gen_range(0..100000);
println!("total: {}", total);
for num in 0..total {
rb1.insert(rand::thread_rng().gen_range(0..u32::MAX));
rb2.insert(num);
}
println!("sparse: {}", rb1.serialized_size());
println!("compact: {}", rb2.serialized_size());
println!(
"ratio: {}",
rb1.serialized_size() as f64 / rb2.serialized_size() as f64
);
}
}

299
src/query/filter.rs Normal file
View file

@ -0,0 +1,299 @@
use std::{
borrow::Cow,
ops::{BitAndAssign, BitOrAssign, BitXorAssign},
};
use ahash::AHashSet;
use roaring::RoaringBitmap;
use crate::{
fts::{
index::MAX_TOKEN_LENGTH, stemmer::Stemmer, term_index::TermIndex, tokenizers::Tokenizer,
},
write::Tokenize,
BitmapKey, Error, IndexKey, Store, ValueKey, BM_TERM, TERM_EXACT, TERM_STEMMED,
};
use super::{Filter, ResultSet};
struct State {
op: Filter,
bm: Option<RoaringBitmap>,
}
impl Store {
pub fn filter(
&self,
account_id: u32,
collection: u8,
filters: Vec<Filter>,
) -> crate::Result<ResultSet> {
let document_ids = self
.get_document_ids(account_id, collection)?
.unwrap_or_else(RoaringBitmap::new);
let mut state: State = Filter::And.into();
let mut stack = Vec::new();
let mut filters = filters.into_iter().peekable();
while let Some(filter) = filters.next() {
match filter {
Filter::HasKeyword { field, value } => {
state.op.apply(
&mut state.bm,
self.get_bitmap(BitmapKey {
account_id,
collection,
family: BM_TERM | TERM_EXACT,
field,
key: value.as_bytes(),
})?,
&document_ids,
);
}
Filter::HasKeywords { field, value } => {
let tokens = value.tokenize();
state.op.apply(
&mut state.bm,
self.get_bitmaps_intersection(
tokens
.iter()
.map(|key| BitmapKey {
account_id,
collection,
family: BM_TERM | TERM_EXACT,
field,
key,
})
.collect(),
)?,
&document_ids,
);
}
Filter::MatchValue { field, op, value } => {
state.op.apply(
&mut state.bm,
self.range_to_bitmap(
IndexKey {
account_id,
collection,
field,
key: &value,
},
op,
)?,
&document_ids,
);
}
Filter::HasText {
field,
text,
language,
match_phrase,
} => {
if match_phrase {
let phrase = Tokenizer::new(&text, language, MAX_TOKEN_LENGTH)
.map(|token| token.word)
.collect::<Vec<_>>();
let mut keys = Vec::with_capacity(phrase.len());
for word in &phrase {
let key = BitmapKey {
account_id,
collection,
family: BM_TERM | TERM_EXACT,
field,
key: word.as_bytes(),
};
if !keys.contains(&key) {
keys.push(key);
}
}
// Retrieve the Term Index for each candidate and match the exact phrase
if let Some(candidates) = self.get_bitmaps_intersection(keys)? {
let mut results = RoaringBitmap::new();
for document_id in candidates.iter() {
if let Some(term_index) = self.get_value::<TermIndex>(ValueKey {
account_id,
collection,
document_id,
field: u8::MAX,
})? {
if term_index
.match_terms(
&phrase
.iter()
.map(|w| term_index.get_match_term(w, None))
.collect::<Vec<_>>(),
None,
true,
false,
false,
)
.map_err(|e| {
Error::InternalError(format!(
"Corrupted TermIndex for {}: {:?}",
document_id, e
))
})?
.is_some()
{
results.insert(document_id);
}
}
}
state.op.apply(&mut state.bm, results.into(), &document_ids);
} else {
state.op.apply(&mut state.bm, None, &document_ids);
}
} else {
let words = Stemmer::new(&text, language, MAX_TOKEN_LENGTH)
.map(|token| (token.word, token.stemmed_word.unwrap_or(Cow::from(""))))
.collect::<AHashSet<_>>();
let mut requested_keys = AHashSet::default();
let mut text_bitmap = None;
for (word, stemmed_word) in &words {
let mut keys = Vec::new();
for (word, family) in [
(word, BM_TERM | TERM_EXACT),
(word, BM_TERM | TERM_STEMMED),
(stemmed_word, BM_TERM | TERM_EXACT),
(stemmed_word, BM_TERM | TERM_STEMMED),
] {
if !word.is_empty() {
let key = BitmapKey {
account_id,
collection,
family,
field,
key: word.as_bytes(),
};
if !requested_keys.contains(&key) {
requested_keys.insert(key);
keys.push(key);
}
}
}
// Term already matched on a previous iteration
if keys.is_empty() {
continue;
}
Filter::And.apply(
&mut text_bitmap,
self.get_bitmaps_union(keys)?,
&document_ids,
);
if text_bitmap.as_ref().unwrap().is_empty() {
break;
}
}
state.op.apply(&mut state.bm, text_bitmap, &document_ids);
}
}
Filter::InBitmap { family, field, key } => {
state.op.apply(
&mut state.bm,
self.get_bitmap(BitmapKey {
account_id,
collection,
family,
field,
key: &key,
})?,
&document_ids,
);
}
Filter::DocumentSet(set) => {
state.op.apply(&mut state.bm, Some(set), &document_ids);
}
op @ (Filter::And | Filter::Or | Filter::Not) => {
stack.push(state);
state = op.into();
continue;
}
Filter::End => {
if let Some(mut prev_state) = stack.pop() {
prev_state
.op
.apply(&mut prev_state.bm, state.bm, &document_ids);
state = prev_state;
} else {
break;
}
}
}
if matches!(state.op, Filter::And) && state.bm.as_ref().unwrap().is_empty() {
while let Some(filter) = filters.peek() {
if matches!(filter, Filter::End) {
break;
} else {
filters.next();
}
}
}
}
Ok(ResultSet {
results: state.bm.unwrap_or_else(RoaringBitmap::new),
document_ids,
})
}
}
impl Filter {
#[inline(always)]
pub fn apply(
&self,
dest: &mut Option<RoaringBitmap>,
mut src: Option<RoaringBitmap>,
not_mask: &RoaringBitmap,
) {
if let Some(dest) = dest {
match self {
Filter::And => {
if let Some(src) = src {
dest.bitand_assign(src);
} else {
dest.clear();
}
}
Filter::Or => {
if let Some(src) = src {
dest.bitor_assign(src);
}
}
Filter::Not => {
if let Some(mut src) = src {
src.bitxor_assign(not_mask);
dest.bitand_assign(src);
}
}
_ => unreachable!(),
}
} else if let Some(ref mut src_) = src {
if let Filter::Not = self {
src_.bitxor_assign(not_mask);
}
*dest = src;
} else if let Filter::Not = self {
*dest = Some(not_mask.clone());
} else {
*dest = Some(RoaringBitmap::new());
}
}
}
impl From<Filter> for State {
fn from(value: Filter) -> Self {
Self {
op: value,
bm: None,
}
}
}

160
src/query/mod.rs Normal file
View file

@ -0,0 +1,160 @@
pub mod filter;
pub mod sort;
use roaring::RoaringBitmap;
use crate::{
fts::{lang::LanguageDetector, Language},
Serialize,
};
#[derive(Debug, Clone, Copy)]
pub enum Operator {
LowerThan,
LowerEqualThan,
GreaterThan,
GreaterEqualThan,
Equal,
}
#[derive(Debug)]
pub enum Filter {
HasKeyword {
field: u8,
value: String,
},
HasKeywords {
field: u8,
value: String,
},
MatchValue {
field: u8,
op: Operator,
value: Vec<u8>,
},
HasText {
field: u8,
text: String,
language: Language,
match_phrase: bool,
},
InBitmap {
family: u8,
field: u8,
key: Vec<u8>,
},
DocumentSet(RoaringBitmap),
And,
Or,
Not,
End,
}
#[derive(Debug)]
pub enum Comparator {
Field { field: u8, ascending: bool },
DocumentSet { set: RoaringBitmap, ascending: bool },
}
pub struct ResultSet {
results: RoaringBitmap,
document_ids: RoaringBitmap,
}
pub struct SortedResultRet {
pub position: i32,
pub ids: Vec<u32>,
pub found_anchor: bool,
}
impl Filter {
pub fn new_condition(field: impl Into<u8>, op: Operator, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op,
value: value.serialize(),
}
}
pub fn eq(field: impl Into<u8>, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op: Operator::Equal,
value: value.serialize(),
}
}
pub fn lt(field: impl Into<u8>, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op: Operator::LowerThan,
value: value.serialize(),
}
}
pub fn le(field: impl Into<u8>, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op: Operator::LowerEqualThan,
value: value.serialize(),
}
}
pub fn gt(field: impl Into<u8>, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op: Operator::GreaterThan,
value: value.serialize(),
}
}
pub fn ge(field: impl Into<u8>, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op: Operator::GreaterEqualThan,
value: value.serialize(),
}
}
pub fn match_text(field: impl Into<u8>, mut text: String, mut language: Language) -> Self {
let match_phrase = (text.starts_with('"') && text.ends_with('"'))
|| (text.starts_with('\'') && text.ends_with('\''));
if !match_phrase && language == Language::Unknown {
language = if let Some((l, t)) = text
.split_once(':')
.and_then(|(l, t)| (Language::from_iso_639(l)?, t.to_string()).into())
{
text = t;
l
} else {
LanguageDetector::detect_single(&text)
.and_then(|(l, c)| if c > 0.3 { Some(l) } else { None })
.unwrap_or(Language::Unknown)
};
}
Filter::HasText {
field: field.into(),
text,
language,
match_phrase,
}
}
}
impl Comparator {
pub fn ascending(field: impl Into<u8>) -> Self {
Self::Field {
field: field.into(),
ascending: true,
}
}
pub fn descending(field: impl Into<u8>) -> Self {
Self::Field {
field: field.into(),
ascending: false,
}
}
}

424
src/query/sort.rs Normal file
View file

@ -0,0 +1,424 @@
use std::ops::{BitAndAssign, BitXorAssign};
use roaring::RoaringBitmap;
use rocksdb::{
DBIteratorWithThreadMode, Direction, IteratorMode, MultiThreaded, OptimisticTransactionDB,
};
use crate::{
backend::rocksdb::{ACCOUNT_KEY_LEN, CF_INDEXES},
write::key::KeySerializer,
Error, Store,
};
use super::{Comparator, ResultSet, SortedResultRet};
enum IndexType<'x> {
DocumentSet {
set: RoaringBitmap,
it: Option<roaring::bitmap::IntoIter>,
},
DB {
it: Option<DBIteratorWithThreadMode<'x, OptimisticTransactionDB<MultiThreaded>>>,
prefix: Vec<u8>,
start_key: Vec<u8>,
ascending: bool,
prev_item: Option<u32>,
prev_key: Option<Box<[u8]>>,
},
}
struct IndexIterator<'x> {
index: IndexType<'x>,
remaining: RoaringBitmap,
eof: bool,
}
impl Store {
#[allow(clippy::too_many_arguments)]
pub fn sort(
&self,
account_id: u32,
collection: u8,
mut result_set: ResultSet,
comparators: Vec<Comparator>,
limit: usize,
mut position: i32,
anchor: Option<u32>,
mut anchor_offset: i32,
) -> crate::Result<SortedResultRet> {
let has_anchor = anchor.is_some();
let mut anchor_found = false;
let requested_position = position;
let mut result = SortedResultRet {
position,
ids: Vec::with_capacity(std::cmp::min(limit, result_set.results.len() as usize)),
found_anchor: true,
};
let mut iterators = comparators
.into_iter()
.map(|comp| IndexIterator {
index: match comp {
Comparator::Field { field, ascending } => {
let prefix = KeySerializer::new(ACCOUNT_KEY_LEN)
.write(account_id)
.write(collection)
.write(field)
.finalize();
IndexType::DB {
it: None,
start_key: if !ascending {
let (key_account_id, key_collection, key_field) = if field < u8::MAX
{
(account_id, collection, field + 1)
} else if (collection) < u8::MAX {
(account_id, (collection) + 1, field)
} else {
(account_id + 1, collection, field)
};
KeySerializer::new(ACCOUNT_KEY_LEN)
.write(key_account_id)
.write(key_collection)
.write(key_field)
.finalize()
} else {
prefix.clone()
},
prefix,
ascending,
prev_item: None,
prev_key: None,
}
}
Comparator::DocumentSet { mut set, ascending } => IndexType::DocumentSet {
set: if !ascending {
if !set.is_empty() {
set.bitxor_assign(&result_set.document_ids);
set
} else {
result_set.document_ids.clone()
}
} else {
set
},
it: None,
},
},
remaining: std::mem::replace(&mut result_set.results, RoaringBitmap::new()),
eof: false,
})
.collect::<Vec<_>>();
let mut current = 0;
'outer: loop {
let mut doc_id;
'inner: loop {
let (it_opts, mut next_it_opts) = if current < iterators.len() - 1 {
let (iterators_first, iterators_last) = iterators.split_at_mut(current + 1);
(
iterators_first.last_mut().unwrap(),
iterators_last.first_mut(),
)
} else {
(&mut iterators[current], None)
};
if !matches!(it_opts.index, IndexType::DB { prev_item,.. } if prev_item.is_some())
{
if it_opts.remaining.is_empty() {
if current > 0 {
current -= 1;
continue 'inner;
} else {
break 'outer;
}
} else if it_opts.remaining.len() == 1 || it_opts.eof {
doc_id = it_opts.remaining.min().unwrap();
it_opts.remaining.remove(doc_id);
break 'inner;
}
}
match &mut it_opts.index {
IndexType::DB {
it,
prefix,
start_key,
ascending,
prev_item,
prev_key,
} => {
let it = if let Some(it) = it {
it
} else {
*it = Some(self.db.iterator_cf(
&self.db.cf_handle(CF_INDEXES).unwrap(),
IteratorMode::From(
start_key,
if *ascending {
Direction::Forward
} else {
Direction::Reverse
},
),
));
it.as_mut().unwrap()
};
let mut prev_key_prefix = prev_key
.as_ref()
.and_then(|k| k.get(..k.len() - std::mem::size_of::<u32>()))
.unwrap_or_default();
if let Some(prev_item) = prev_item.take() {
if let Some(next_it_opts) = &mut next_it_opts {
next_it_opts.remaining.insert(prev_item);
} else {
doc_id = prev_item;
break 'inner;
}
}
let mut is_eof = false;
loop {
if let Some(result) = it.next() {
let (key, _) = result.map_err(|e| {
Error::InternalError(format!("Iterator error: {}", e))
})?;
if !key.starts_with(prefix) {
*prev_key = None;
is_eof = true;
break;
}
doc_id = u32::from_be_bytes(
key.get(key.len() - std::mem::size_of::<u32>()..)
.ok_or_else(|| {
Error::InternalError("Invalid index entry".to_string())
})?
.try_into()
.unwrap(),
);
if it_opts.remaining.contains(doc_id) {
it_opts.remaining.remove(doc_id);
if let Some(next_it_opts) = &mut next_it_opts {
if let Some(prev_key_) = &*prev_key {
if key.len() != prev_key_.len()
|| !key.starts_with(prev_key_prefix)
{
*prev_item = Some(doc_id);
*prev_key = Some(key);
break;
}
} else {
*prev_key = Some(key);
prev_key_prefix = prev_key
.as_ref()
.and_then(|key| {
key.get(
..key.len() - std::mem::size_of::<u32>(),
)
})
.ok_or_else(|| {
Error::InternalError(
"Invalid index entry".to_string(),
)
})?;
}
next_it_opts.remaining.insert(doc_id);
} else {
// doc id found
break 'inner;
}
}
} else {
is_eof = true;
break;
}
}
if is_eof {
if let Some(next_it_opts) = &mut next_it_opts {
if !it_opts.remaining.is_empty() {
next_it_opts.remaining |= &it_opts.remaining;
it_opts.remaining.clear();
}
*prev_key = None;
it_opts.eof = true;
}
}
}
IndexType::DocumentSet { set, it } => {
if let Some(it) = it {
if let Some(_doc_id) = it.next() {
doc_id = _doc_id;
break 'inner;
}
} else {
let mut set = set.clone();
set.bitand_assign(&it_opts.remaining);
let set_len = set.len();
if set_len > 0 {
it_opts.remaining.bitxor_assign(&set);
match &mut next_it_opts {
Some(next_it_opts) if set_len > 1 => {
next_it_opts.remaining = set;
}
_ if set_len == 1 => {
doc_id = set.min().unwrap();
break 'inner;
}
_ => {
let mut it_ = set.into_iter();
let result = it_.next();
*it = Some(it_);
if let Some(result) = result {
doc_id = result;
break 'inner;
} else {
break 'outer;
}
}
}
} else if !it_opts.remaining.is_empty() {
if let Some(ref mut next_it_opts) = next_it_opts {
next_it_opts.remaining = std::mem::take(&mut it_opts.remaining);
}
}
};
}
};
if let Some(next_it_opts) = next_it_opts {
if !next_it_opts.remaining.is_empty() {
if next_it_opts.remaining.len() == 1 {
doc_id = next_it_opts.remaining.min().unwrap();
next_it_opts.remaining.remove(doc_id);
break 'inner;
} else {
match &mut next_it_opts.index {
IndexType::DB {
it,
start_key,
ascending,
prev_item,
prev_key,
..
} => {
if let Some(it) = it {
*it = self.db.iterator_cf(
&self.db.cf_handle(CF_INDEXES).unwrap(),
IteratorMode::From(
start_key,
if *ascending {
Direction::Forward
} else {
Direction::Reverse
},
),
);
}
*prev_item = None;
*prev_key = None;
}
IndexType::DocumentSet { it, .. } => {
*it = None;
}
}
current += 1;
next_it_opts.eof = false;
continue 'inner;
}
}
}
it_opts.eof = true;
if it_opts.remaining.is_empty() {
if current > 0 {
current -= 1;
} else {
break 'outer;
}
}
}
// Pagination
if !has_anchor {
if position >= 0 {
if position > 0 {
position -= 1;
} else {
result.ids.push(doc_id);
if limit > 0 && result.ids.len() == limit {
break 'outer;
}
}
} else {
result.ids.push(doc_id);
}
} else if anchor_offset >= 0 {
if !anchor_found {
if &doc_id != anchor.as_ref().unwrap() {
continue 'outer;
}
anchor_found = true;
}
if anchor_offset > 0 {
anchor_offset -= 1;
} else {
result.ids.push(doc_id);
if limit > 0 && result.ids.len() == limit {
break 'outer;
}
}
} else {
anchor_found = &doc_id == anchor.as_ref().unwrap();
result.ids.push(doc_id);
if !anchor_found {
continue 'outer;
}
position = anchor_offset;
break 'outer;
}
}
if !has_anchor || anchor_found {
if !has_anchor && requested_position >= 0 {
result.position = if position == 0 { requested_position } else { 0 };
} else if position >= 0 {
result.position = position;
} else {
let position = position.unsigned_abs() as usize;
let start_offset = if position < result.ids.len() {
result.ids.len() - position
} else {
0
};
result.position = start_offset as i32;
let end_offset = if limit > 0 {
std::cmp::min(start_offset + limit, result.ids.len())
} else {
result.ids.len()
};
result.ids = result.ids[start_offset..end_offset].to_vec()
}
} else {
result.found_anchor = false;
}
Ok(result)
}
}

343
src/rocksdb.rs Normal file
View file

@ -0,0 +1,343 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{convert::TryInto, path::PathBuf, sync::Arc};
use rocksdb::{
BoundColumnFamily, ColumnFamilyDescriptor, DBIteratorWithThreadMode, MergeOperands,
MultiThreaded, OptimisticTransactionDB, Options,
};
use crate::{Deserialize, Error, InnerStore};
pub struct RocksDB {
db: OptimisticTransactionDB<MultiThreaded>,
}
pub struct RocksDBIterator<'x> {
it: DBIteratorWithThreadMode<'x, OptimisticTransactionDB<MultiThreaded>>,
}
impl Iterator for RocksDBIterator<'_> {
type Item = (Box<[u8]>, Box<[u8]>);
#[allow(clippy::while_let_on_iterator)]
#[inline(always)]
fn next(&mut self) -> Option<Self::Item> {
while let Some(result) = self.it.next() {
if let Ok(item) = result {
return Some(item);
}
}
None
}
}
impl InnerStore for RocksDB {
type Iterator<'x> = RocksDBIterator<'x>;
#[inline(always)]
fn delete(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result<()> {
self.db
.delete_cf(&self.cf_handle(cf)?, key)
.map_err(|err| Error::InternalError(format!("delete_cf failed: {}", err)))
}
#[inline(always)]
fn set(&self, cf: crate::ColumnFamily, key: &[u8], value: &[u8]) -> crate::Result<()> {
self.db
.put_cf(&self.cf_handle(cf)?, key, value)
.map_err(|err| Error::InternalError(format!("put_cf failed: {}", err)))
}
#[inline(always)]
fn get<U>(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result<Option<U>>
where
U: Deserialize,
{
if let Some(bytes) = self
.db
.get_pinned_cf(&self.cf_handle(cf)?, key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
Ok(Some(U::deserialize(&bytes).ok_or_else(|| {
Error::DeserializeError(format!("Failed to deserialize key: {:?}", key))
})?))
} else {
Ok(None)
}
}
#[inline(always)]
fn merge(&self, cf: crate::ColumnFamily, key: &[u8], value: &[u8]) -> crate::Result<()> {
self.db
.merge_cf(&self.cf_handle(cf)?, key, value)
.map_err(|err| Error::InternalError(format!("merge_cf failed: {}", err)))
}
/*
#[inline(always)]
fn write(&self, batch: Vec<WriteOperation>) -> crate::Result<()> {
let mut rocks_batch = rocksdb::WriteBatch::default();
let cf_bitmaps = self.cf_handle(crate::ColumnFamily::Bitmaps)?;
let cf_values = self.cf_handle(crate::ColumnFamily::Values)?;
let cf_indexes = self.cf_handle(crate::ColumnFamily::Indexes)?;
let cf_blobs = self.cf_handle(crate::ColumnFamily::Blobs)?;
let cf_logs = self.cf_handle(crate::ColumnFamily::Logs)?;
for op in batch {
match op {
WriteOperation::Set { cf, key, value } => {
rocks_batch.put_cf(
match cf {
crate::ColumnFamily::Bitmaps => &cf_bitmaps,
crate::ColumnFamily::Values => &cf_values,
crate::ColumnFamily::Indexes => &cf_indexes,
crate::ColumnFamily::Blobs => &cf_blobs,
crate::ColumnFamily::Logs => &cf_logs,
},
key,
value,
);
}
WriteOperation::Delete { cf, key } => {
rocks_batch.delete_cf(
match cf {
crate::ColumnFamily::Bitmaps => &cf_bitmaps,
crate::ColumnFamily::Values => &cf_values,
crate::ColumnFamily::Indexes => &cf_indexes,
crate::ColumnFamily::Blobs => &cf_blobs,
crate::ColumnFamily::Logs => &cf_logs,
},
key,
);
}
WriteOperation::Merge { cf, key, value } => {
rocks_batch.merge_cf(
match cf {
crate::ColumnFamily::Bitmaps => &cf_bitmaps,
crate::ColumnFamily::Values => &cf_values,
crate::ColumnFamily::Indexes => &cf_indexes,
crate::ColumnFamily::Blobs => &cf_blobs,
crate::ColumnFamily::Logs => &cf_logs,
},
key,
value,
);
}
}
}
self.db
.write(rocks_batch)
.map_err(|err| Error::InternalError(format!("batch write failed: {}", err)))
}
*/
#[inline(always)]
fn exists(&self, cf: crate::ColumnFamily, key: &[u8]) -> crate::Result<bool> {
Ok(self
.db
.get_pinned_cf(&self.cf_handle(cf)?, key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
.is_some())
}
#[inline(always)]
fn multi_get<T, U>(
&self,
cf: crate::ColumnFamily,
keys: Vec<U>,
) -> crate::Result<Vec<Option<T>>>
where
T: Deserialize,
U: AsRef<[u8]>,
{
let cf_handle = self.cf_handle(cf)?;
let mut results = Vec::with_capacity(keys.len());
for value in self
.db
.multi_get_cf(keys.iter().map(|key| (&cf_handle, key)).collect::<Vec<_>>())
{
results.push(
if let Some(bytes) = value
.map_err(|err| Error::InternalError(format!("multi_get_cf failed: {}", err)))?
{
T::deserialize(&bytes)
.ok_or_else(|| {
Error::DeserializeError("Failed to deserialize keys.".to_string())
})?
.into()
} else {
None
},
);
}
Ok(results)
}
#[inline(always)]
fn iterator<'x>(
&'x self,
cf: crate::ColumnFamily,
start: &[u8],
direction: crate::Direction,
) -> crate::Result<Self::Iterator<'x>> {
Ok(RocksDBIterator {
it: self.db.iterator_cf(
&self.cf_handle(cf)?,
rocksdb::IteratorMode::From(
start,
match direction {
crate::Direction::Forward => rocksdb::Direction::Forward,
crate::Direction::Backward => rocksdb::Direction::Reverse,
},
),
),
})
}
fn compact(&self, cf: crate::ColumnFamily) -> crate::Result<()> {
self.db
.compact_range_cf(&self.cf_handle(cf)?, None::<&[u8]>, None::<&[u8]>);
Ok(())
}
fn open() -> crate::Result<Self> {
// Create the database directory if it doesn't exist
let path = PathBuf::from(
"/tmp/rocksdb.test", /*&settings
.get("db-path")
.unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/
);
let mut idx_path = path;
idx_path.push("idx");
std::fs::create_dir_all(&idx_path).map_err(|err| {
Error::InternalError(format!(
"Failed to create index directory {}: {:?}",
idx_path.display(),
err
))
})?;
// Bitmaps
let cf_bitmaps = {
let mut cf_opts = Options::default();
//cf_opts.set_max_write_buffer_number(16);
//cf_opts.set_merge_operator("merge", bitmap_merge, bitmap_partial_merge);
//cf_opts.set_compaction_filter("compact", bitmap_compact);
ColumnFamilyDescriptor::new("bitmaps", cf_opts)
};
// Stored values
let cf_values = {
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
ColumnFamilyDescriptor::new("values", cf_opts)
};
// Secondary indexes
let cf_indexes = {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new("indexes", cf_opts)
};
// Blobs
let cf_blobs = {
let mut cf_opts = Options::default();
cf_opts.set_enable_blob_files(true);
cf_opts.set_min_blob_size(
16834, /*settings.parse("blob-min-size").unwrap_or(16384) */
);
ColumnFamilyDescriptor::new("blobs", cf_opts)
};
// Raft log and change log
let cf_log = {
let cf_opts = Options::default();
ColumnFamilyDescriptor::new("logs", cf_opts)
};
let mut db_opts = Options::default();
db_opts.create_missing_column_families(true);
db_opts.create_if_missing(true);
Ok(RocksDB {
db: OptimisticTransactionDB::open_cf_descriptors(
&db_opts,
idx_path,
vec![cf_bitmaps, cf_values, cf_indexes, cf_blobs, cf_log],
)
.map_err(|e| Error::InternalError(e.into_string()))?,
})
}
fn close(&self) -> crate::Result<()> {
self.db
.flush()
.map_err(|e| Error::InternalError(e.to_string()))?;
self.db.cancel_all_background_work(true);
Ok(())
}
}
impl RocksDB {
#[inline(always)]
fn cf_handle(&self, cf: crate::ColumnFamily) -> crate::Result<Arc<BoundColumnFamily>> {
self.db
.cf_handle(match cf {
crate::ColumnFamily::Bitmaps => "bitmaps",
crate::ColumnFamily::Values => "values",
crate::ColumnFamily::Indexes => "indexes",
crate::ColumnFamily::Blobs => "blobs",
crate::ColumnFamily::Logs => "logs",
})
.ok_or_else(|| {
Error::InternalError(format!(
"Failed to get handle for '{:?}' column family.",
cf
))
})
}
}
pub fn numeric_value_merge(
_key: &[u8],
value: Option<&[u8]>,
operands: &MergeOperands,
) -> Option<Vec<u8>> {
let mut value = if let Some(value) = value {
i64::from_le_bytes(value.try_into().ok()?)
} else {
0
};
for op in operands.iter() {
value += i64::from_le_bytes(op.try_into().ok()?);
}
let mut bytes = Vec::with_capacity(std::mem::size_of::<i64>());
bytes.extend_from_slice(&value.to_le_bytes());
Some(bytes)
}

117
src/write/batch.rs Normal file
View file

@ -0,0 +1,117 @@
use crate::{BM_TERM, TERM_EXACT};
use super::{
Batch, BatchBuilder, HasFlag, IntoBitmap, IntoOperations, Operation, Serialize, Tokenize,
F_CLEAR, F_INDEX, F_TOKENIZE, F_VALUE,
};
impl BatchBuilder {
pub fn new() -> Self {
Self {
ops: Vec::new(),
last_account_id: 0,
last_document_id: 0,
last_collection: 0,
}
}
pub fn with_context(
&mut self,
account_id: u32,
document_id: u32,
collection: impl Into<u8>,
) -> &mut Self {
self.last_account_id = account_id;
self.last_document_id = document_id;
self.last_collection = collection.into();
self.push_context();
self
}
#[inline(always)]
pub(super) fn push_context(&mut self) {
self.ops.push(Operation::WithContext {
account_id: self.last_account_id,
document_id: self.last_document_id,
collection: self.last_collection,
});
}
pub fn value(
&mut self,
field: impl Into<u8>,
value: impl Serialize + Tokenize,
options: u32,
) -> &mut Self {
let field = field.into();
let is_set = !options.has_flag(F_CLEAR);
if options.has_flag(F_TOKENIZE) {
for token in value.tokenize() {
self.ops.push(Operation::Bitmap {
family: BM_TERM | TERM_EXACT,
field,
key: token,
set: is_set,
});
}
}
let value = value.serialize();
if options.has_flag(F_INDEX) {
self.ops.push(Operation::Index {
field,
key: value.clone(),
set: is_set,
});
}
if options.has_flag(F_VALUE) {
self.ops.push(Operation::Value {
field,
set: if is_set { Some(value) } else { None },
});
}
self
}
pub fn bitmap(&mut self, field: impl Into<u8>, value: impl IntoBitmap, options: u32) {
let (key, family) = value.into_bitmap();
self.ops.push(Operation::Bitmap {
family,
field: field.into(),
key,
set: !options.has_flag(F_CLEAR),
});
}
pub fn acl(&mut self, to_account_id: u32, acl: Option<impl Serialize>) {
self.ops.push(Operation::Acl {
to_account_id,
set: acl.map(|acl| acl.serialize()),
})
}
pub fn blob(&mut self, blob_id: impl Serialize, options: u32) {
self.ops.push(Operation::Blob {
key: blob_id.serialize(),
set: !options.has_flag(F_CLEAR),
});
}
pub fn custom(&mut self, value: impl IntoOperations) -> crate::Result<()> {
value.build(self)
}
pub fn build(self) -> Batch {
Batch { ops: self.ops }
}
}
impl Default for BatchBuilder {
fn default() -> Self {
Self::new()
}
}

93
src/write/key.rs Normal file
View file

@ -0,0 +1,93 @@
use std::convert::TryInto;
use utils::codec::leb128::Leb128_;
pub struct KeySerializer {
buf: Vec<u8>,
}
pub trait KeySerialize {
fn serialize(&self, buf: &mut Vec<u8>);
}
pub trait DeserializeBigEndian {
fn deserialize_be_u32(&self, index: usize) -> Option<u32>;
fn deserialize_be_u64(&self, index: usize) -> Option<u64>;
}
impl KeySerializer {
pub fn new(capacity: usize) -> Self {
Self {
buf: Vec::with_capacity(capacity),
}
}
pub fn write<T: KeySerialize>(mut self, value: T) -> Self {
value.serialize(&mut self.buf);
self
}
pub fn write_leb128<T: Leb128_>(mut self, value: T) -> Self {
T::to_leb128_bytes(value, &mut self.buf);
self
}
pub fn finalize(self) -> Vec<u8> {
self.buf
}
}
impl KeySerialize for u8 {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.push(*self);
}
}
impl KeySerialize for &str {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(self.as_bytes());
}
}
impl KeySerialize for &String {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(self.as_bytes());
}
}
impl KeySerialize for &[u8] {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(self);
}
}
impl KeySerialize for u32 {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&self.to_be_bytes());
}
}
impl KeySerialize for u64 {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&self.to_be_bytes());
}
}
impl DeserializeBigEndian for &[u8] {
fn deserialize_be_u32(&self, index: usize) -> Option<u32> {
u32::from_be_bytes(
self.get(index..index + std::mem::size_of::<u32>())?
.try_into()
.ok()?,
)
.into()
}
fn deserialize_be_u64(&self, index: usize) -> Option<u64> {
u64::from_be_bytes(
self.get(index..index + std::mem::size_of::<u64>())?
.try_into()
.ok()?,
)
.into()
}
}

109
src/write/log.rs Normal file
View file

@ -0,0 +1,109 @@
use ahash::AHashSet;
use utils::{codec::leb128::Leb128Vec, map::vec_map::VecMap};
use crate::Serialize;
use super::{IntoOperations, Operation};
pub struct ChangeLogBuilder {
pub change_id: u64,
pub changes: VecMap<u8, Change>,
}
#[derive(Default)]
pub struct Change {
pub inserts: AHashSet<u64>,
pub updates: AHashSet<u64>,
pub deletes: AHashSet<u64>,
pub child_updates: AHashSet<u64>,
}
impl ChangeLogBuilder {
pub fn with_change_id(change_id: u64) -> ChangeLogBuilder {
ChangeLogBuilder {
change_id,
changes: VecMap::default(),
}
}
pub fn log_insert(&mut self, collection: impl Into<u8>, jmap_id: impl Into<u64>) {
self.changes
.get_mut_or_insert(collection.into())
.inserts
.insert(jmap_id.into());
}
pub fn log_update(&mut self, collection: impl Into<u8>, jmap_id: impl Into<u64>) {
self.changes
.get_mut_or_insert(collection.into())
.updates
.insert(jmap_id.into());
}
pub fn log_child_update(&mut self, collection: impl Into<u8>, jmap_id: impl Into<u64>) {
self.changes
.get_mut_or_insert(collection.into())
.child_updates
.insert(jmap_id.into());
}
pub fn log_delete(&mut self, collection: impl Into<u8>, jmap_id: impl Into<u64>) {
self.changes
.get_mut_or_insert(collection.into())
.deletes
.insert(jmap_id.into());
}
pub fn log_move(
&mut self,
collection: impl Into<u8>,
old_jmap_id: impl Into<u64>,
new_jmap_id: impl Into<u64>,
) {
let change = self.changes.get_mut_or_insert(collection.into());
change.deletes.insert(old_jmap_id.into());
change.inserts.insert(new_jmap_id.into());
}
}
impl IntoOperations for ChangeLogBuilder {
fn build(self, batch: &mut super::BatchBuilder) -> crate::Result<()> {
for (collection, changes) in self.changes {
if collection != batch.last_collection {
batch.last_collection = collection;
batch.push_context();
}
batch.ops.push(Operation::Log {
change_id: self.change_id,
changes: changes.serialize(),
});
}
Ok(())
}
}
impl Serialize for Change {
fn serialize(self) -> Vec<u8> {
let mut buf = Vec::with_capacity(
1 + (self.inserts.len()
+ self.updates.len()
+ self.child_updates.len()
+ self.deletes.len()
+ 4)
* std::mem::size_of::<usize>(),
);
buf.push_leb128(self.inserts.len());
buf.push_leb128(self.updates.len());
buf.push_leb128(self.child_updates.len());
buf.push_leb128(self.deletes.len());
for list in [self.inserts, self.updates, self.child_updates, self.deletes] {
for id in list {
buf.push_leb128(id);
}
}
buf
}
}

157
src/write/mod.rs Normal file
View file

@ -0,0 +1,157 @@
use std::collections::HashSet;
use crate::Serialize;
pub mod batch;
pub mod key;
pub mod log;
pub const F_VALUE: u32 = 1 << 0;
pub const F_INDEX: u32 = 1 << 1;
pub const F_TOKENIZE: u32 = 1 << 2;
pub const F_CLEAR: u32 = 1 << 3;
pub struct Batch {
pub ops: Vec<Operation>,
}
pub struct BatchBuilder {
pub last_account_id: u32,
pub last_document_id: u32,
pub last_collection: u8,
pub ops: Vec<Operation>,
}
pub enum Operation {
WithContext {
account_id: u32,
document_id: u32,
collection: u8,
},
Value {
field: u8,
set: Option<Vec<u8>>,
},
Index {
field: u8,
key: Vec<u8>,
set: bool,
},
Bitmap {
family: u8,
field: u8,
key: Vec<u8>,
set: bool,
},
Blob {
key: Vec<u8>,
set: bool,
},
Acl {
to_account_id: u32,
set: Option<Vec<u8>>,
},
Log {
change_id: u64,
changes: Vec<u8>,
},
}
impl Serialize for u32 {
fn serialize(self) -> Vec<u8> {
self.to_be_bytes().to_vec()
}
}
impl Serialize for u64 {
fn serialize(self) -> Vec<u8> {
self.to_be_bytes().to_vec()
}
}
impl Serialize for f64 {
fn serialize(self) -> Vec<u8> {
self.to_be_bytes().to_vec()
}
}
impl Serialize for &str {
fn serialize(self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}
impl Serialize for String {
fn serialize(self) -> Vec<u8> {
self.into_bytes()
}
}
trait HasFlag {
fn has_flag(&self, flag: u32) -> bool;
}
impl HasFlag for u32 {
#[inline(always)]
fn has_flag(&self, flag: u32) -> bool {
self & flag == flag
}
}
pub trait Tokenize {
fn tokenize(&self) -> HashSet<Vec<u8>>;
}
impl Tokenize for &str {
fn tokenize(&self) -> HashSet<Vec<u8>> {
let mut tokens = HashSet::new();
let mut token = String::new();
for ch in self.chars() {
if ch.is_alphanumeric() {
if ch.is_uppercase() {
token.push(ch.to_lowercase().next().unwrap());
} else {
token.push(ch);
}
} else if !token.is_empty() {
tokens.insert(token.into_bytes());
token = String::new();
}
}
tokens
}
}
impl Tokenize for String {
fn tokenize(&self) -> HashSet<Vec<u8>> {
self.as_str().tokenize()
}
}
impl Tokenize for u32 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
unreachable!()
}
}
impl Tokenize for u64 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
unreachable!()
}
}
impl Tokenize for f64 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
unreachable!()
}
}
pub trait IntoBitmap {
fn into_bitmap(self) -> (Vec<u8>, u8);
}
pub trait IntoOperations {
fn build(self, batch: &mut BatchBuilder) -> crate::Result<()>;
}