Bloom text index

This commit is contained in:
Mauro D 2023-03-26 17:02:56 +00:00
parent 8cbe5bebc3
commit 05d53da6fb
22 changed files with 1506 additions and 365 deletions

View file

@ -17,3 +17,11 @@ whatlang = "0.16" # Language detection
rust-stemmers = "1.2" # Stemmers
tinysegmenter = "0.1" # Japanese tokenizer
jieba-rs = "0.6" # Chinese stemmer
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
farmhash = "1.1.5"
siphasher = "0.3"
[dev-dependencies]
csv = "1.1"
flate2 = { version = "1.0.17", features = ["zlib"], default-features = false }
rayon = "1.5.1"

View file

@ -1,92 +0,0 @@
I have the following SQLite table for storing email data:
CREATE TABLE email (
email_id INTEGER PRIMARY KEY,
blob_id TEXT NOT NULL,
thread_id INTEGER NOT NULL,
size INTEGER NOT NULL,
received_at TIMESTAMP NOT NULL,
message_id TEXT NOT NULL,
in_reply_to TEXT NOT NULL,
sender TEXT NOT NULL,
from TEXT NOT NULL,
to TEXT NOT NULL,
cc TEXT NOT NULL,
bcc TEXT NOT NULL,
reply_to TEXT NOT NULL,
subject TEXT NOT NULL,
sent_at TIMESTAMP NOT NULL,
has_attachment BOOL NOT NULL,
preview TEXT NOT NULL
);
The mailboxes and keywords for each message are stored in separate tables:
CREATE TABLE email_mailbox (
email_id INTEGER PRIMARY KEY,
mailbox_id INTEGER NOT NULL,
)
CREATE TABLE email_keyword (
email_id INTEGER PRIMARY KEY,
keyword TEXT NOT NULL
);
How would you write a SQLite query to list the email IDs of all the Emails with the subject "sales" sorted by messages that belong to the same Thread and have a the keyword "draft", then sorted by received at and then has attachment.
[email]
id: INT
blob_id: HASH
thread_id: INT
size: INT
received_at: TIMESTAMP
message_id: TEXT
in_reply_to: TEXT
sender: TEXT
from: TEXT
to: TEXT
cc: TEXT
bcc: TEXT
reply_to: TEXT
subject: TEXT
sent_at: TIMESTAMP
has_attachment: BOOL
preview: TEXT
[email_mailbox]
email_id: INT
mailbox_id: INT
imap_uid: INT
[email_keyword]
email_id: INT
keyword: TEXT
/*
o id
o blobId
o threadId
o mailboxIds
o keywords
o size
o receivedAt
o messageId
o inReplyTo
o sender
o from
o to
o cc
o bcc
o replyTo
o subject
o sentAt
o hasAttachment
o preview
[ "partId", "blobId", "size", "name", "type", "charset",
"disposition", "cid", "language", "location" ]
*/

View file

@ -226,7 +226,7 @@ pub fn bitmap_merge<'x>(
deserialize_bitlist(&mut bm, op);
}
_ => {
debug_assert!(false, "This should not have happend");
debug_assert!(false, "This should not have happened");
return None;
}
}

View file

@ -11,7 +11,7 @@ impl Store {
pub fn open() -> crate::Result<Self> {
// Create the database directory if it doesn't exist
let path = PathBuf::from(
"/tmp/rocksdb.test", /*&settings
"/tmp/rocksdb_test", /*&settings
.get("db-path")
.unwrap_or_else(|| "/usr/local/stalwart-jmap/data".to_string())*/
);

View file

@ -1,8 +1,11 @@
use crate::{write::key::KeySerializer, BitmapKey, IndexKey, Serialize, ValueKey};
use crate::{
write::key::KeySerializer, AclKey, BitmapKey, BlobKey, IndexKey, LogKey, Serialize, ValueKey,
};
pub mod bitmap;
pub mod main;
pub mod read;
pub mod write;
pub const CF_BITMAPS: &str = "b";
pub const CF_VALUES: &str = "v";
@ -15,13 +18,15 @@ pub const FIELD_PREFIX_LEN: usize = COLLECTION_PREFIX_LEN + std::mem::size_of::<
pub const ACCOUNT_KEY_LEN: usize =
std::mem::size_of::<u32>() + std::mem::size_of::<u8>() + std::mem::size_of::<u32>();
impl Serialize for IndexKey<'_> {
impl<T: AsRef<[u8]>> Serialize for IndexKey<T> {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<IndexKey>() + self.key.len())
let key = self.key.as_ref();
KeySerializer::new(std::mem::size_of::<IndexKey<T>>() + key.len())
.write(self.account_id)
.write(self.collection)
.write(self.field)
.write(self.key)
.write(key)
.write(self.document_id)
.finalize()
}
}
@ -37,14 +42,55 @@ impl Serialize for ValueKey {
}
}
impl Serialize for BitmapKey<'_> {
impl<T: AsRef<[u8]>> Serialize for BitmapKey<T> {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<BitmapKey>() + self.key.len())
.write(self.key)
.write(self.field)
let key = self.key.as_ref();
KeySerializer::new(std::mem::size_of::<BitmapKey<T>>() + key.len())
.write_leb128(self.account_id)
.write(self.collection)
.write(self.family)
.write_leb128(self.account_id)
.write(self.field)
.write(key)
.finalize()
}
}
impl<T: AsRef<[u8]>> Serialize for BlobKey<T> {
fn serialize(self) -> Vec<u8> {
let hash = self.hash.as_ref();
KeySerializer::new(std::mem::size_of::<BlobKey<T>>() + hash.len())
.write(hash)
.write_leb128(self.account_id)
.write(self.collection)
.write_leb128(self.document_id)
.finalize()
}
}
impl Serialize for AclKey {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<AclKey>())
.write_leb128(self.grant_account_id)
.write(u8::MAX)
.write_leb128(self.to_account_id)
.write(self.to_collection)
.write_leb128(self.to_document_id)
.finalize()
}
}
impl Serialize for LogKey {
fn serialize(self) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<LogKey>())
.write(self.account_id)
.write(self.collection)
.write(self.change_id)
.finalize()
}
}
impl From<rocksdb::Error> for crate::Error {
fn from(value: rocksdb::Error) -> Self {
Self::InternalError(format!("RocksDB error: {}", value))
}
}

View file

@ -4,21 +4,22 @@ use roaring::RoaringBitmap;
use rocksdb::{Direction, IteratorMode};
use crate::{
query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, IndexKey,
Serialize, Store, ValueKey, BM_DOCUMENT_IDS,
query::Operator, write::key::DeserializeBigEndian, BitmapKey, Deserialize, Error, Serialize,
Store, BM_DOCUMENT_IDS,
};
use super::{CF_BITMAPS, CF_INDEXES, CF_VALUES, FIELD_PREFIX_LEN};
impl Store {
#[inline(always)]
pub fn get_value<U>(&self, key: ValueKey) -> crate::Result<Option<U>>
pub fn get_value<U>(&self, key: impl Serialize) -> crate::Result<Option<U>>
where
U: Deserialize,
{
let key = key.serialize();
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key.serialize())
.get_pinned_cf(&self.db.cf_handle(CF_VALUES).unwrap(), &key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
Ok(Some(U::deserialize(&bytes).ok_or_else(|| {
@ -30,14 +31,14 @@ impl Store {
}
#[inline(always)]
pub fn get_values<U>(&self, keys: Vec<ValueKey>) -> crate::Result<Vec<Option<U>>>
pub fn get_values<U>(&self, keys: Vec<impl Serialize>) -> crate::Result<Vec<Option<U>>>
where
U: Deserialize,
{
let cf_handle = self.db.cf_handle(CF_VALUES).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.iter()
keys.into_iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
@ -74,14 +75,18 @@ impl Store {
}
#[inline(always)]
pub fn get_bitmap(&self, key: BitmapKey) -> crate::Result<Option<RoaringBitmap>> {
pub fn get_bitmap<T: AsRef<[u8]>>(
&self,
key: BitmapKey<T>,
) -> crate::Result<Option<RoaringBitmap>> {
let key = key.serialize();
if let Some(bytes) = self
.db
.get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), key.serialize())
.get_pinned_cf(&self.db.cf_handle(CF_BITMAPS).unwrap(), &key)
.map_err(|err| Error::InternalError(format!("get_cf failed: {}", err)))?
{
let bm = RoaringBitmap::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!("Failed to deserialize key: {:?}", key))
Error::InternalError(format!("Failed to deserialize key: {:?}", &key))
})?;
Ok(if !bm.is_empty() { Some(bm) } else { None })
} else {
@ -90,11 +95,11 @@ impl Store {
}
#[inline(always)]
fn get_bitmaps(&self, keys: Vec<BitmapKey>) -> crate::Result<Vec<Option<RoaringBitmap>>> {
fn get_bitmaps<T: Serialize>(&self, keys: Vec<T>) -> crate::Result<Vec<Option<RoaringBitmap>>> {
let cf_handle = self.db.cf_handle(CF_BITMAPS).unwrap();
let mut results = Vec::with_capacity(keys.len());
for value in self.db.multi_get_cf(
keys.iter()
keys.into_iter()
.map(|key| (&cf_handle, key.serialize()))
.collect::<Vec<_>>(),
) {
@ -116,9 +121,9 @@ impl Store {
Ok(results)
}
pub(crate) fn get_bitmaps_intersection(
pub(crate) fn get_bitmaps_intersection<T: Serialize>(
&self,
keys: Vec<BitmapKey<'_>>,
keys: Vec<T>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in self.get_bitmaps(keys)? {
@ -138,9 +143,9 @@ impl Store {
Ok(result)
}
pub(crate) fn get_bitmaps_union(
pub(crate) fn get_bitmaps_union<T: Serialize>(
&self,
keys: Vec<BitmapKey<'_>>,
keys: Vec<T>,
) -> crate::Result<Option<RoaringBitmap>> {
let mut result: Option<RoaringBitmap> = None;
for bitmap in (self.get_bitmaps(keys)?).into_iter().flatten() {
@ -155,21 +160,20 @@ impl Store {
pub(crate) fn range_to_bitmap(
&self,
key: IndexKey<'_>,
match_key: &[u8],
match_value: &[u8],
op: Operator,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
let match_key = key.serialize();
let match_prefix = &match_key[0..FIELD_PREFIX_LEN];
let match_value = &match_key[FIELD_PREFIX_LEN..];
for result in self.db.iterator_cf(
&self.db.cf_handle(CF_INDEXES).unwrap(),
IteratorMode::From(
&match_key,
match_key,
match op {
Operator::GreaterThan => Direction::Forward,
Operator::GreaterEqualThan => Direction::Forward,
Operator::Equal => Direction::Forward,
Operator::GreaterThan | Operator::GreaterEqualThan | Operator::Equal => {
Direction::Forward
}
_ => Direction::Reverse,
},
),

View file

@ -0,0 +1,224 @@
use std::time::Instant;
use roaring::RoaringBitmap;
use rocksdb::ErrorKind;
use crate::{
write::{key::KeySerializer, Batch, Operation},
AclKey, BitmapKey, BlobKey, Deserialize, Error, IndexKey, LogKey, Serialize, Store, ValueKey,
BM_BLOOM, BM_DOCUMENT_IDS,
};
use super::{
bitmap::{clear_bit, set_bit},
CF_BITMAPS, CF_BLOBS, CF_INDEXES, CF_LOGS, CF_VALUES,
};
impl Store {
pub fn write(&self, batch: Batch) -> crate::Result<()> {
let cf_values = self.db.cf_handle(CF_VALUES).unwrap();
let cf_bitmaps = self.db.cf_handle(CF_BITMAPS).unwrap();
let cf_indexes = self.db.cf_handle(CF_INDEXES).unwrap();
let cf_logs = self.db.cf_handle(CF_LOGS).unwrap();
let cf_blobs = self.db.cf_handle(CF_BLOBS).unwrap();
let start = Instant::now();
loop {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let txn = self.db.transaction();
let mut wb = txn.get_writebatch();
for op in &batch.ops {
match op {
Operation::AccountId {
account_id: account_id_,
} => {
account_id = *account_id_;
}
Operation::Collection {
collection: collection_,
} => {
collection = *collection_;
}
Operation::DocumentId {
document_id: document_id_,
set,
} => {
if *document_id_ == u32::MAX {
let key = BitmapKey {
account_id,
collection,
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
}
.serialize();
let mut document_ids = if let Some(bytes) = txn
.get_pinned_for_update_cf(&cf_bitmaps, &key, true)
.map_err(|err| {
Error::InternalError(format!("get_cf failed: {}", err))
})? {
RoaringBitmap::deserialize(&bytes).ok_or_else(|| {
Error::InternalError(format!(
"Failed to deserialize key: {:?}",
key
))
})?
} else {
RoaringBitmap::new()
};
document_id = if let Some(max_id) = document_ids.max() {
let mask = if max_id < 20000 {
RoaringBitmap::from_sorted_iter(0..max_id + 2).unwrap()
} else {
RoaringBitmap::full()
};
document_ids ^= mask;
document_ids.min().unwrap()
} else {
0
};
wb.merge_cf(&cf_bitmaps, key, set_bit(document_id));
} else {
document_id = *document_id_;
if !*set {
wb.merge_cf(
&cf_bitmaps,
BitmapKey {
account_id,
collection,
family: BM_DOCUMENT_IDS,
field: u8::MAX,
key: b"",
}
.serialize(),
clear_bit(document_id),
);
}
}
}
Operation::Value { field, set } => {
let key = ValueKey {
account_id,
collection,
document_id,
field: *field,
}
.serialize();
if let Some(value) = set {
wb.put_cf(&cf_values, key, value);
} else {
wb.delete_cf(&cf_values, key);
}
}
Operation::Index { field, key, set } => {
let key_ = IndexKey {
account_id,
collection,
document_id,
field: *field,
key,
}
.serialize();
if *set {
wb.put_cf(&cf_indexes, key_, []);
} else {
wb.delete_cf(&cf_indexes, key_);
}
}
Operation::Bitmap {
family,
field,
key,
set,
} => {
let key = BitmapKey {
account_id,
collection,
family: *family,
field: *field,
key,
}
.serialize();
let value = if *set {
set_bit(document_id)
} else {
clear_bit(document_id)
};
wb.merge_cf(&cf_bitmaps, key, value);
}
Operation::Bloom { family, field, set } => {
let key = KeySerializer::new(std::mem::size_of::<ValueKey>())
.write_leb128(account_id)
.write(collection)
.write_leb128(document_id)
.write(u8::MAX)
.write(BM_BLOOM | *family)
.write(*field)
.finalize();
if let Some(value) = set {
wb.put_cf(&cf_values, key, value);
} else {
wb.delete_cf(&cf_values, key);
}
}
Operation::Blob { key, set } => {
let key = BlobKey {
account_id,
collection,
document_id,
hash: key,
}
.serialize();
if *set {
wb.put_cf(&cf_blobs, key, []);
} else {
wb.delete_cf(&cf_blobs, key);
}
}
Operation::Acl {
grant_account_id,
set,
} => {
let key = AclKey {
grant_account_id: *grant_account_id,
to_account_id: account_id,
to_collection: collection,
to_document_id: document_id,
}
.serialize();
if let Some(value) = set {
wb.put_cf(&cf_values, key, value);
} else {
wb.delete_cf(&cf_values, key);
}
}
Operation::Log { change_id, changes } => {
let coco = "_";
let key = LogKey {
account_id,
collection,
change_id: *change_id,
}
.serialize();
wb.put_cf(&cf_logs, key, changes);
}
}
}
match self.db.write(wb) {
Ok(_) => {
//println!("Success with id {}", document_id);
return Ok(());
}
Err(err) => match err.kind() {
ErrorKind::Busy | ErrorKind::MergeInProgress | ErrorKind::TryAgain
if start.elapsed().as_secs() < 5 => {}
_ => return Err(err.into()),
},
}
}
}
}

184
src/fts/bloom.rs Normal file
View file

@ -0,0 +1,184 @@
use std::{
borrow::Cow,
f64::consts::LN_2,
hash::{Hash, Hasher},
};
use roaring::RoaringBitmap;
use utils::codec::leb128::{Leb128Reader, Leb128Vec};
use crate::{Deserialize, Serialize};
use super::{stemmer::StemmedToken, tokenizers::Token};
pub struct BloomFilter {
m: u64,
b: RoaringBitmap,
}
#[derive(Debug)]
pub struct BloomHash {
pub h: [u64; 7],
}
#[derive(Debug)]
pub struct BloomHashGroup {
pub h1: BloomHash,
pub h2: Option<BloomHash>,
}
const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
0xaf1f2242106c64b3,
0x60ca4cfb4b3ed0ce,
0xc7dbc0bb615e82b3,
0x520ad065378daf88,
);
lazy_static::lazy_static! {
static ref SIPHASHER: siphasher::sip::SipHasher13 =
siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
}
const P: f64 = 0.01;
impl BloomFilter {
pub fn new(items: usize) -> Self {
Self {
m: if items > 0 {
std::cmp::max(Self::estimate_m(items, P), 10240)
} else {
0
},
b: RoaringBitmap::new(),
}
}
fn from_params(m: u64, b: RoaringBitmap) -> Self {
Self { m, b }
}
fn estimate_m(n: usize, p: f64) -> u64 {
(((n as f64) * f64::ln(p) / (-8.0 * LN_2.powi(2))).ceil() as u64) * 8
}
#[allow(dead_code)]
fn estimate_k(m: u64, n: usize) -> u32 {
std::cmp::max(((m as f64) / (n as f64) * f64::ln(2.0f64)).ceil() as u32, 1)
}
pub fn insert(&mut self, hash: &BloomHash) {
self.b.insert((hash.h[0] % self.m) as u32);
self.b.insert((hash.h[1] % self.m) as u32);
self.b.insert((hash.h[2] % self.m) as u32);
self.b.insert((hash.h[3] % self.m) as u32);
self.b.insert((hash.h[4] % self.m) as u32);
self.b.insert((hash.h[5] % self.m) as u32);
self.b.insert((hash.h[6] % self.m) as u32);
}
pub fn contains(&self, hash: &BloomHash) -> bool {
self.b.contains((hash.h[0] % self.m) as u32)
&& self.b.contains((hash.h[1] % self.m) as u32)
&& self.b.contains((hash.h[2] % self.m) as u32)
&& self.b.contains((hash.h[3] % self.m) as u32)
&& self.b.contains((hash.h[4] % self.m) as u32)
&& self.b.contains((hash.h[5] % self.m) as u32)
&& self.b.contains((hash.h[6] % self.m) as u32)
}
pub fn is_subset(&self, other: &Self) -> bool {
self.b.is_subset(&other.b)
}
pub fn is_empty(&self) -> bool {
self.m == 0 || self.b.is_empty()
}
}
impl BloomHash {
pub fn hash<T: Hash + AsRef<[u8]> + ?Sized>(item: &T) -> Self {
let h1 = xxhash_rust::xxh3::xxh3_64(item.as_ref());
let h2 = farmhash::hash64(item.as_ref());
/*let h2 = naive_cityhash::cityhash64_with_seeds(
item.as_ref(),
0x99693e7c5b56f555,
0x34809fd70b6ebf45,
);*/
let h3 = AHASHER.hash_one(item);
let mut sh = *SIPHASHER;
sh.write(item.as_ref());
let h4 = sh.finish();
Self {
h: [h1, h2, h3, h4, h1 ^ h2, h2 ^ h3, h3 ^ h4],
}
}
}
impl From<&str> for BloomHash {
fn from(s: &str) -> Self {
Self::hash(&s)
}
}
impl From<String> for BloomHash {
fn from(s: String) -> Self {
Self::hash(&s)
}
}
impl From<&String> for BloomHash {
fn from(s: &String) -> Self {
Self::hash(&s)
}
}
impl From<Cow<'_, str>> for BloomHash {
fn from(s: Cow<'_, str>) -> Self {
Self::hash(s.as_ref())
}
}
impl From<Token<'_>> for BloomHashGroup {
fn from(t: Token<'_>) -> Self {
Self {
h1: BloomHash::hash(t.word.as_ref()),
h2: None,
}
}
}
impl From<StemmedToken<'_>> for BloomHashGroup {
fn from(t: StemmedToken<'_>) -> Self {
Self {
h1: BloomHash::hash(t.word.as_ref()),
h2: t.stemmed_word.map(|w| BloomHash::hash(&format!("{w}_"))),
}
}
}
impl From<Cow<'_, str>> for BloomHashGroup {
fn from(t: Cow<'_, str>) -> Self {
Self {
h1: BloomHash::hash(t.as_ref()),
h2: None,
}
}
}
impl Serialize for BloomFilter {
fn serialize(self) -> Vec<u8> {
let mut buf = Vec::with_capacity(std::mem::size_of::<u64>() + self.b.serialized_size());
buf.push_leb128(self.m);
let _ = self.b.serialize_into(&mut buf);
buf
}
}
impl Deserialize for BloomFilter {
fn deserialize(bytes: &[u8]) -> Option<Self> {
let (m, pos) = bytes.read_leb128()?;
let b = RoaringBitmap::deserialize_unchecked_from(bytes.get(pos..)?).ok()?;
Some(Self::from_params(m, b))
}
}

View file

@ -1,37 +1,37 @@
use std::collections::HashSet;
use std::borrow::Cow;
use ahash::AHashSet;
use crate::{
write::{BatchBuilder, IntoOperations, Operation, Tokenize},
Error, Serialize, BM_TERM, TERM_EXACT, TERM_STEMMED,
write::{BatchBuilder, IntoOperations, Operation},
Serialize, BLOOM_BIGRAM, BLOOM_STEMMED, BLOOM_TRIGRAM, BM_BLOOM,
};
use super::{
bloom::{BloomFilter, BloomHash},
lang::{LanguageDetector, MIN_LANGUAGE_SCORE},
ngram::ToNgrams,
stemmer::Stemmer,
term_index::{TermIndexBuilder, TokenIndex},
Language,
};
pub const MAX_TOKEN_LENGTH: usize = 25;
pub const MAX_TOKEN_LENGTH: usize = 50;
struct Text<'x> {
field: u8,
text: &'x str,
text: Cow<'x, str>,
language: Language,
part_id: u32,
}
pub struct IndexBuilder<'x> {
pub struct FtsIndexBuilder<'x> {
parts: Vec<Text<'x>>,
detect: LanguageDetector,
default_language: Language,
}
impl<'x> IndexBuilder<'x> {
pub fn with_default_language(default_language: Language) -> IndexBuilder<'x> {
IndexBuilder {
impl<'x> FtsIndexBuilder<'x> {
pub fn with_default_language(default_language: Language) -> FtsIndexBuilder<'x> {
FtsIndexBuilder {
parts: vec![],
detect: LanguageDetector::new(),
default_language,
@ -41,30 +41,27 @@ impl<'x> IndexBuilder<'x> {
pub fn index(
&mut self,
field: impl Into<u8>,
text: &'x str,
text: impl Into<Cow<'x, str>>,
mut language: Language,
part_id: u32,
) {
let text = text.into();
if language == Language::Unknown {
language = self.detect.detect(text, MIN_LANGUAGE_SCORE);
language = self.detect.detect(&text, MIN_LANGUAGE_SCORE);
}
self.parts.push(Text {
field: field.into(),
text,
language,
part_id,
});
}
}
impl<'x> IntoOperations for IndexBuilder<'x> {
impl<'x> IntoOperations for FtsIndexBuilder<'x> {
fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> {
let default_language = self
.detect
.most_frequent_language()
.unwrap_or(self.default_language);
let mut term_index = TermIndexBuilder::new();
let mut words = HashSet::new();
for part in &self.parts {
let language = if part.language != Language::Unknown {
@ -72,44 +69,58 @@ impl<'x> IntoOperations for IndexBuilder<'x> {
} else {
default_language
};
let mut unique_words = AHashSet::new();
let mut phrase_words = Vec::new();
let mut terms = Vec::new();
for token in Stemmer::new(part.text, language, MAX_TOKEN_LENGTH) {
words.insert((token.word.as_bytes().to_vec(), part.field, true));
for token in Stemmer::new(&part.text, language, MAX_TOKEN_LENGTH).collect::<Vec<_>>() {
unique_words.insert(token.word.to_string());
if let Some(stemmed_word) = token.stemmed_word.as_ref() {
words.insert((stemmed_word.as_bytes().to_vec(), part.field, false));
unique_words.insert(format!("{}_", stemmed_word));
}
terms.push(term_index.add_stemmed_token(token));
phrase_words.push(token.word);
}
if !terms.is_empty() {
term_index.add_terms(part.field, part.part_id, terms);
let mut bloom_stemmed = BloomFilter::new(unique_words.len());
for word in unique_words {
let hash = BloomHash::from(word);
bloom_stemmed.insert(&hash);
//for h in [0, 1] {
batch.ops.push(Operation::Bitmap {
family: BM_BLOOM,
field: part.field,
key: hash.as_high_rank_hash(0).serialize(),
set: true,
});
//}
}
}
for (key, field, is_exact) in words {
batch.ops.push(Operation::Bitmap {
family: BM_TERM | if is_exact { TERM_EXACT } else { TERM_STEMMED },
field,
key,
set: true,
batch.ops.push(Operation::Bloom {
field: part.field,
family: BLOOM_STEMMED,
set: bloom_stemmed.serialize().into(),
});
}
if !term_index.is_empty() {
batch.ops.push(Operation::Value {
field: u8::MAX,
set: term_index.serialize().into(),
});
if phrase_words.len() > 1 {
batch.ops.push(Operation::Bloom {
field: part.field,
family: BLOOM_BIGRAM,
set: BloomFilter::to_ngrams(&phrase_words, 2).serialize().into(),
});
if phrase_words.len() > 2 {
batch.ops.push(Operation::Bloom {
field: part.field,
family: BLOOM_TRIGRAM,
set: BloomFilter::to_ngrams(&phrase_words, 3).serialize().into(),
});
}
}
}
Ok(())
}
}
/*
impl IntoOperations for TokenIndex {
fn build(self, batch: &mut BatchBuilder) -> crate::Result<()> {
let mut tokens = AHashSet::new();
@ -149,9 +160,4 @@ impl IntoOperations for TokenIndex {
Ok(())
}
}
impl Tokenize for TermIndexBuilder {
fn tokenize(&self) -> HashSet<Vec<u8>> {
unreachable!()
}
}
*/

View file

@ -23,12 +23,17 @@
pub mod lang;
//pub mod pdf;
pub mod index;
pub mod bloom;
pub mod builder;
pub mod ngram;
pub mod query;
pub mod search_snippet;
pub mod stemmer;
pub mod term_index;
pub mod tokenizers;
pub const HIGH_RANK_MOD: u64 = 10_240;
#[derive(Debug, PartialEq, Clone, Copy, Hash, Eq, serde::Serialize, serde::Deserialize)]
pub enum Language {
Esperanto = 0,

38
src/fts/ngram.rs Normal file
View file

@ -0,0 +1,38 @@
use std::borrow::Cow;
use super::bloom::{BloomFilter, BloomHashGroup};
pub trait ToNgrams: Sized {
fn new(items: usize) -> Self;
fn insert(&mut self, item: &str);
fn to_ngrams(tokens: &[Cow<'_, str>], n: usize) -> Self {
let mut filter = Self::new(tokens.len().saturating_sub(1));
for words in tokens.windows(n) {
filter.insert(&words.join(" "));
}
filter
}
}
impl ToNgrams for BloomFilter {
fn new(items: usize) -> Self {
BloomFilter::new(items)
}
fn insert(&mut self, item: &str) {
self.insert(&item.into())
}
}
impl ToNgrams for Vec<BloomHashGroup> {
fn new(items: usize) -> Self {
Vec::with_capacity(items)
}
fn insert(&mut self, item: &str) {
self.push(BloomHashGroup {
h1: item.into(),
h2: None,
})
}
}

204
src/fts/query.rs Normal file
View file

@ -0,0 +1,204 @@
use std::time::Instant;
use roaring::RoaringBitmap;
use crate::{
fts::{
bloom::{BloomFilter, BloomHash, BloomHashGroup},
builder::MAX_TOKEN_LENGTH,
ngram::ToNgrams,
stemmer::Stemmer,
tokenizers::Tokenizer,
},
write::key::KeySerializer,
BitmapKey, Store, ValueKey, BLOOM_BIGRAM, BLOOM_STEMMED, BLOOM_TRIGRAM, BM_BLOOM,
};
use super::{Language, HIGH_RANK_MOD};
impl Store {
pub(crate) fn fts_query(
&self,
account_id: u32,
collection: u8,
field: u8,
text: &str,
language: Language,
match_phrase: bool,
) -> crate::Result<Option<RoaringBitmap>> {
let real_now = Instant::now();
let (bitmaps, hashes, family) = if match_phrase {
let mut tokens = Vec::new();
let mut bit_keys = Vec::new();
for token in Tokenizer::new(text, language, MAX_TOKEN_LENGTH) {
let hash = BloomHash::from(token.word.as_ref());
let key = hash.to_high_rank_key(account_id, collection, field, 0);
if !bit_keys.contains(&key) {
bit_keys.push(key);
}
tokens.push(token.word);
}
let bitmaps = match self.get_bitmaps_intersection(bit_keys)? {
Some(b) if !b.is_empty() => b,
_ => return Ok(None),
};
match tokens.len() {
0 => (bitmaps, vec![], BLOOM_STEMMED),
1 => (
bitmaps,
vec![tokens.into_iter().next().unwrap().into()],
BLOOM_STEMMED,
),
2 => (
bitmaps,
<Vec<BloomHashGroup>>::to_ngrams(&tokens, 2),
BLOOM_BIGRAM,
),
_ => (
bitmaps,
<Vec<BloomHashGroup>>::to_ngrams(&tokens, 3),
BLOOM_TRIGRAM,
),
}
} else {
let mut hashes = Vec::new();
let mut bitmaps = RoaringBitmap::new();
for token in Stemmer::new(text, language, MAX_TOKEN_LENGTH) {
let hash = BloomHashGroup {
h2: if let Some(stemmed_word) = token.stemmed_word {
Some(format!("{stemmed_word}_").into())
} else {
Some(format!("{}_", token.word).into())
},
h1: token.word.into(),
};
match self.get_bitmaps_union(vec![
hash.h1.to_high_rank_key(account_id, collection, field, 0),
hash.h2
.as_ref()
.unwrap()
.to_high_rank_key(account_id, collection, field, 0),
])? {
Some(b) if !b.is_empty() => {
if !bitmaps.is_empty() {
bitmaps &= b;
if bitmaps.is_empty() {
return Ok(None);
}
} else {
bitmaps = b;
}
}
_ => return Ok(None),
};
hashes.push(hash);
}
(bitmaps, hashes, BLOOM_STEMMED)
};
let b_count = bitmaps.len();
let mut bm = RoaringBitmap::new();
/*let keys = bitmaps
.iter()
.map(|document_id| {
KeySerializer::new(std::mem::size_of::<ValueKey>())
.write_leb128(account_id)
.write(collection)
.write_leb128(document_id)
.write(u8::MAX)
.write(BM_BLOOM | family)
.write(field)
.finalize()
})
.collect::<Vec<_>>();
self.get_values::<BloomFilter>(keys)?
.into_iter()
.zip(bitmaps)
.for_each(|(bloom, document_id)| {
if let Some(bloom) = bloom {
if !bloom.is_empty() {
let mut matched = true;
for hash in &hashes {
if !(bloom.contains(&hash.h1)
|| hash.h2.as_ref().map_or(false, |h2| bloom.contains(h2)))
{
matched = false;
break;
}
}
if matched {
bm.insert(document_id);
}
}
}
});*/
for document_id in bitmaps {
let key = KeySerializer::new(std::mem::size_of::<ValueKey>() + 2)
.write_leb128(account_id)
.write(collection)
.write_leb128(document_id)
.write(u8::MAX)
.write(BM_BLOOM | family)
.write(field)
.finalize();
if let Some(bloom) = self.get_value::<BloomFilter>(key)? {
if !bloom.is_empty() {
let mut matched = true;
for hash in &hashes {
if !(bloom.contains(&hash.h1)
|| hash.h2.as_ref().map_or(false, |h2| bloom.contains(h2)))
{
matched = false;
break;
}
}
if matched {
bm.insert(document_id);
}
}
}
}
println!(
"bloom_match {b_count} items in {:?}ms",
real_now.elapsed().as_millis()
);
Ok(Some(bm))
}
}
impl BloomHash {
#[inline(always)]
pub fn as_high_rank_hash(&self, n: usize) -> u16 {
(self.h[n] % HIGH_RANK_MOD) as u16
}
pub fn to_high_rank_key(
&self,
account_id: u32,
collection: u8,
field: u8,
n: usize,
) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<BitmapKey<&[u8]>>() + 2)
.write_leb128(account_id)
.write(collection)
.write(BM_BLOOM)
.write(field)
.write(self.as_high_rank_hash(n))
.finalize()
}
}

View file

@ -5,6 +5,9 @@ pub mod fts;
pub mod query;
pub mod write;
#[cfg(test)]
pub mod tests;
pub struct Store {
db: OptimisticTransactionDB<MultiThreaded>,
}
@ -18,20 +21,21 @@ pub trait Serialize {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BitmapKey<'x> {
pub struct BitmapKey<T: AsRef<[u8]>> {
pub account_id: u32,
pub collection: u8,
pub family: u8,
pub field: u8,
pub key: &'x [u8],
pub key: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct IndexKey<'x> {
pub struct IndexKey<T: AsRef<[u8]>> {
pub account_id: u32,
pub collection: u8,
pub document_id: u32,
pub field: u8,
pub key: &'x [u8],
pub key: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
@ -42,8 +46,32 @@ pub struct ValueKey {
pub field: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct BlobKey<T: AsRef<[u8]>> {
pub account_id: u32,
pub collection: u8,
pub document_id: u32,
pub hash: T,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct AclKey {
pub grant_account_id: u32,
pub to_account_id: u32,
pub to_collection: u8,
pub to_document_id: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct LogKey {
pub account_id: u32,
pub collection: u8,
pub change_id: u64,
}
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug)]
pub enum Error {
NotFound,
InternalError(String),
@ -52,6 +80,11 @@ pub enum Error {
pub const BM_DOCUMENT_IDS: u8 = 0;
pub const BM_TERM: u8 = 0x10;
pub const BM_TAG: u8 = 0x20;
pub const BM_BLOOM: u8 = 0x40;
pub const BLOOM_STEMMED: u8 = 0x00;
pub const BLOOM_BIGRAM: u8 = 0x01;
pub const BLOOM_TRIGRAM: u8 = 0x02;
pub const TERM_EXACT: u8 = 0x00;
pub const TERM_STEMMED: u8 = 0x01;
@ -61,31 +94,3 @@ pub const TERM_HASH: u8 = 0x04;
pub const TAG_ID: u8 = 0x00;
pub const TAG_TEXT: u8 = 0x01;
pub const TAG_STATIC: u8 = 0x02;
#[cfg(test)]
mod tests {
use rand::Rng;
use roaring::RoaringBitmap;
use super::*;
#[test]
fn it_works() {
let mut rb1 = RoaringBitmap::new();
let mut rb2 = RoaringBitmap::new();
let total = rand::thread_rng().gen_range(0..100000);
println!("total: {}", total);
for num in 0..total {
rb1.insert(rand::thread_rng().gen_range(0..u32::MAX));
rb2.insert(num);
}
println!("sparse: {}", rb1.serialized_size());
println!("compact: {}", rb2.serialized_size());
println!(
"ratio: {}",
rb1.serialized_size() as f64 / rb2.serialized_size() as f64
);
}
}

View file

@ -1,17 +1,10 @@
use std::{
borrow::Cow,
ops::{BitAndAssign, BitOrAssign, BitXorAssign},
};
use std::ops::{BitAndAssign, BitOrAssign, BitXorAssign};
use ahash::AHashSet;
use roaring::RoaringBitmap;
use crate::{
fts::{
index::MAX_TOKEN_LENGTH, stemmer::Stemmer, term_index::TermIndex, tokenizers::Tokenizer,
},
write::Tokenize,
BitmapKey, Error, IndexKey, Store, ValueKey, BM_TERM, TERM_EXACT, TERM_STEMMED,
write::{key::KeySerializer, Tokenize},
BitmapKey, IndexKey, Store, BM_TERM, TERM_EXACT,
};
use super::{Filter, ResultSet};
@ -31,6 +24,13 @@ impl Store {
let document_ids = self
.get_document_ids(account_id, collection)?
.unwrap_or_else(RoaringBitmap::new);
if filters.is_empty() {
return Ok(ResultSet {
results: document_ids.clone(),
document_ids,
});
}
let mut state: State = Filter::And.into();
let mut stack = Vec::new();
let mut filters = filters.into_iter().peekable();
@ -62,7 +62,7 @@ impl Store {
collection,
family: BM_TERM | TERM_EXACT,
field,
key,
key: key.as_bytes(),
})
.collect(),
)?,
@ -70,17 +70,16 @@ impl Store {
);
}
Filter::MatchValue { field, op, value } => {
let key =
KeySerializer::new(std::mem::size_of::<IndexKey<&[u8]>>() + value.len())
.write(account_id)
.write(collection)
.write(field)
.write(&value[..])
.finalize();
state.op.apply(
&mut state.bm,
self.range_to_bitmap(
IndexKey {
account_id,
collection,
field,
key: &value,
},
op,
)?,
self.range_to_bitmap(&key, &value, op)?,
&document_ids,
);
}
@ -90,110 +89,18 @@ impl Store {
language,
match_phrase,
} => {
if match_phrase {
let phrase = Tokenizer::new(&text, language, MAX_TOKEN_LENGTH)
.map(|token| token.word)
.collect::<Vec<_>>();
let mut keys = Vec::with_capacity(phrase.len());
for word in &phrase {
let key = BitmapKey {
account_id,
collection,
family: BM_TERM | TERM_EXACT,
field,
key: word.as_bytes(),
};
if !keys.contains(&key) {
keys.push(key);
}
}
// Retrieve the Term Index for each candidate and match the exact phrase
if let Some(candidates) = self.get_bitmaps_intersection(keys)? {
let mut results = RoaringBitmap::new();
for document_id in candidates.iter() {
if let Some(term_index) = self.get_value::<TermIndex>(ValueKey {
account_id,
collection,
document_id,
field: u8::MAX,
})? {
if term_index
.match_terms(
&phrase
.iter()
.map(|w| term_index.get_match_term(w, None))
.collect::<Vec<_>>(),
None,
true,
false,
false,
)
.map_err(|e| {
Error::InternalError(format!(
"Corrupted TermIndex for {}: {:?}",
document_id, e
))
})?
.is_some()
{
results.insert(document_id);
}
}
}
state.op.apply(&mut state.bm, results.into(), &document_ids);
} else {
state.op.apply(&mut state.bm, None, &document_ids);
}
} else {
let words = Stemmer::new(&text, language, MAX_TOKEN_LENGTH)
.map(|token| (token.word, token.stemmed_word.unwrap_or(Cow::from(""))))
.collect::<AHashSet<_>>();
let mut requested_keys = AHashSet::default();
let mut text_bitmap = None;
for (word, stemmed_word) in &words {
let mut keys = Vec::new();
for (word, family) in [
(word, BM_TERM | TERM_EXACT),
(word, BM_TERM | TERM_STEMMED),
(stemmed_word, BM_TERM | TERM_EXACT),
(stemmed_word, BM_TERM | TERM_STEMMED),
] {
if !word.is_empty() {
let key = BitmapKey {
account_id,
collection,
family,
field,
key: word.as_bytes(),
};
if !requested_keys.contains(&key) {
requested_keys.insert(key);
keys.push(key);
}
}
}
// Term already matched on a previous iteration
if keys.is_empty() {
continue;
}
Filter::And.apply(
&mut text_bitmap,
self.get_bitmaps_union(keys)?,
&document_ids,
);
if text_bitmap.as_ref().unwrap().is_empty() {
break;
}
}
state.op.apply(&mut state.bm, text_bitmap, &document_ids);
}
state.op.apply(
&mut state.bm,
self.fts_query(
account_id,
collection,
field,
&text,
language,
match_phrase,
)?,
&document_ids,
);
}
Filter::InBitmap { family, field, key } => {
state.op.apply(
@ -228,6 +135,8 @@ impl Store {
}
}
//println!("{:?}: {:?}", state.op, state.bm);
if matches!(state.op, Filter::And) && state.bm.as_ref().unwrap().is_empty() {
while let Some(filter) = filters.peek() {
if matches!(filter, Filter::End) {

View file

@ -56,6 +56,7 @@ pub enum Comparator {
DocumentSet { set: RoaringBitmap, ascending: bool },
}
#[derive(Debug)]
pub struct ResultSet {
results: RoaringBitmap,
document_ids: RoaringBitmap,
@ -68,7 +69,7 @@ pub struct SortedResultRet {
}
impl Filter {
pub fn new_condition(field: impl Into<u8>, op: Operator, value: impl Serialize) -> Self {
pub fn cond(field: impl Into<u8>, op: Operator, value: impl Serialize) -> Self {
Filter::MatchValue {
field: field.into(),
op,
@ -116,7 +117,26 @@ impl Filter {
}
}
pub fn match_text(field: impl Into<u8>, mut text: String, mut language: Language) -> Self {
pub fn has_keyword(field: impl Into<u8>, value: impl Into<String>) -> Self {
Filter::HasKeyword {
field: field.into(),
value: value.into(),
}
}
pub fn has_keywords(field: impl Into<u8>, value: impl Into<String>) -> Self {
Filter::HasKeywords {
field: field.into(),
value: value.into(),
}
}
pub fn match_text(
field: impl Into<u8>,
text: impl Into<String>,
mut language: Language,
) -> Self {
let mut text = text.into();
let match_phrase = (text.starts_with('"') && text.ends_with('"'))
|| (text.starts_with('\'') && text.ends_with('\''));
@ -141,6 +161,11 @@ impl Filter {
match_phrase,
}
}
#[cfg(test)]
pub fn match_english(field: impl Into<u8>, text: impl Into<String>) -> Self {
Self::match_text(field, text, Language::English)
}
}
impl Comparator {

85
src/tests/mod.rs Normal file
View file

@ -0,0 +1,85 @@
pub mod query;
use std::{collections::BTreeSet, f64::consts::LN_2, io::Read, time::Instant};
use bitpacking::{BitPacker, BitPacker4x, BitPacker8x};
use rand::Rng;
use roaring::RoaringBitmap;
use crate::fts::{
bloom::BloomFilter,
stemmer::{StemmedToken, Stemmer},
Language,
};
use super::*;
pub fn deflate_artwork_data() -> Vec<u8> {
let mut csv_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
csv_path.push("src");
csv_path.push("tests");
csv_path.push("resources");
csv_path.push("artwork_data.csv.gz");
let mut decoder = flate2::bufread::GzDecoder::new(std::io::BufReader::new(
std::fs::File::open(csv_path).unwrap(),
));
let mut result = Vec::new();
decoder.read_to_end(&mut result).unwrap();
result
}
#[test]
fn it_works() {
for n in [10, 100, 1000, 5000, 10000, 100000] {
let mut rb1 = RoaringBitmap::new();
let mut h = BTreeSet::new();
let m = (((n as f64) * f64::ln(0.01) / (-8.0 * LN_2.powi(2))).ceil() as u64) * 8;
for pos in 0..(n * 7_usize) {
let num = rand::thread_rng().gen_range(0..m as u32);
rb1.insert(num);
h.insert(num);
}
let mut compressed = vec![0u8; 4 * BitPacker8x::BLOCK_LEN];
let mut bitpacker = BitPacker8x::new();
let mut initial_value = 0;
let mut bytes = vec![];
for chunk in h
.into_iter()
.collect::<Vec<_>>()
.chunks_exact(BitPacker8x::BLOCK_LEN)
{
let num_bits: u8 = bitpacker.num_bits_sorted(initial_value, chunk);
let compressed_len =
bitpacker.compress_sorted(initial_value, chunk, &mut compressed[..], num_bits);
initial_value = chunk[chunk.len() - 1];
//println!("{:?} {}", compressed_len, num_bits);
bytes.push(num_bits);
bytes.extend_from_slice(&compressed[..compressed_len]);
}
let rb_size = rb1.serialized_size();
let bp_size = bytes.len();
if rb_size < bp_size {
println!("For {} Roaring is better {} vs {}", n, rb_size, bp_size);
} else {
println!("For {} BitPack is better {} vs {}", n, bp_size, rb_size);
}
let now = Instant::now();
let mut ser = Vec::with_capacity(rb_size);
rb1.serialize_into(&mut ser).unwrap();
println!("Roaring serialization took {:?}", now.elapsed().as_millis());
let now = Instant::now();
let deser = RoaringBitmap::deserialize_unchecked_from(&ser[..]).unwrap();
println!(
"Roaring deserialization took {:?}",
now.elapsed().as_millis()
);
}
/*println!(
"ratio: {}",
rb1.serialized_size() as f64 / rb2.serialized_size() as f64
);*/
}

441
src/tests/query.rs Normal file
View file

@ -0,0 +1,441 @@
/*
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
*
* This file is part of the Stalwart JMAP Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::{
sync::{Arc, Mutex},
time::Instant,
};
use ahash::AHashMap;
use crate::{
fts::{builder::FtsIndexBuilder, Language},
query::{Comparator, Filter},
tests::deflate_artwork_data,
write::{BatchBuilder, IntoBitmap, F_INDEX, F_TOKENIZE, F_VALUE},
Store, ValueKey, BM_DOCUMENT_IDS,
};
pub const FIELDS: [&str; 20] = [
"id",
"accession_number",
"artist",
"artistRole",
"artistId",
"title",
"dateText",
"medium",
"creditLine",
"year",
"acquisitionYear",
"dimensions",
"width",
"height",
"depth",
"units",
"inscription",
"thumbnailCopyright",
"thumbnailUrl",
"url",
];
const COLLECTION_ID: u8 = 0;
enum FieldType {
Keyword,
Text,
FullText,
Integer,
}
const FIELDS_OPTIONS: [FieldType; 20] = [
FieldType::Integer, // "id",
FieldType::Keyword, // "accession_number",
FieldType::Text, // "artist",
FieldType::Keyword, // "artistRole",
FieldType::Integer, // "artistId",
FieldType::FullText, // "title",
FieldType::FullText, // "dateText",
FieldType::FullText, // "medium",
FieldType::FullText, // "creditLine",
FieldType::Integer, // "year",
FieldType::Integer, // "acquisitionYear",
FieldType::FullText, // "dimensions",
FieldType::Integer, // "width",
FieldType::Integer, // "height",
FieldType::Integer, // "depth",
FieldType::Text, // "units",
FieldType::FullText, // "inscription",
FieldType::Text, // "thumbnailCopyright",
FieldType::Text, // "thumbnailUrl",
FieldType::Text, // "url",
];
#[test]
pub fn db_test() {
let db = Store::open().unwrap();
test(&db, false);
}
#[allow(clippy::mutex_atomic)]
pub fn test(db: &Store, do_insert: bool) {
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)
.build()
.unwrap();
let now = Instant::now();
let documents = Arc::new(Mutex::new(Vec::new()));
if do_insert {
pool.scope_fifo(|s| {
for (document_id, record) in csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(&deflate_artwork_data()[..])
.records()
.enumerate()
{
let record = record.unwrap();
let documents = documents.clone();
s.spawn_fifo(move |_| {
let mut fts_builder = FtsIndexBuilder::with_default_language(Language::English);
let mut builder = BatchBuilder::new();
builder
.with_account_id(0)
.with_collection(COLLECTION_ID)
.update_document(document_id as u32) // Speed up insertion by manually assigning id
.bitmap(u8::MAX, (), 0);
for (pos, field) in record.iter().enumerate() {
let field_id = pos as u8;
match FIELDS_OPTIONS[pos] {
FieldType::Text => {
if !field.is_empty() {
builder.value(
field_id,
field.to_lowercase(),
F_VALUE | F_TOKENIZE,
);
}
}
FieldType::FullText => {
if !field.is_empty() {
fts_builder.index(
field_id,
field.to_lowercase(),
Language::English,
);
if field_id == 7 {
builder.value(field_id, field.to_lowercase(), F_INDEX);
}
}
}
FieldType::Integer => {
builder.value(
field_id,
field.parse::<u32>().unwrap_or(0),
F_VALUE | F_INDEX,
);
}
FieldType::Keyword => {
if !field.is_empty() {
builder.value(
field_id,
field.to_lowercase(),
F_VALUE | F_INDEX | F_TOKENIZE,
);
}
}
}
}
builder.custom(fts_builder).unwrap();
documents.lock().unwrap().push(builder.build());
});
}
});
println!(
"Parsed {} entries in {} ms.",
documents.lock().unwrap().len(),
now.elapsed().as_millis()
);
let db_ = Arc::new(db);
let now = Instant::now();
pool.scope_fifo(|s| {
let mut documents = documents.lock().unwrap();
for document in documents.drain(..) {
let db = db_.clone();
s.spawn_fifo(move |_| {
db.write(document).unwrap();
});
}
});
println!("Insert took {} ms.", now.elapsed().as_millis());
}
println!("Running filter tests...");
test_filter(db);
println!("Running sort tests...");
test_sort(db);
}
impl IntoBitmap for () {
fn into_bitmap(self) -> (Vec<u8>, u8) {
(vec![], BM_DOCUMENT_IDS)
}
}
pub fn test_filter(db: &Store) {
let mut fields = AHashMap::default();
for (field_num, field) in FIELDS.iter().enumerate() {
fields.insert(field.to_string(), field_num as u8);
}
let tests = [
(
vec![
Filter::match_english(fields["title"], "water"),
Filter::eq(fields["year"], 1979u32),
],
vec!["p11293"],
),
(
vec![
Filter::match_english(fields["medium"], "gelatin"),
Filter::gt(fields["year"], 2000u32),
Filter::lt(fields["width"], 180u32),
Filter::gt(fields["width"], 0u32),
],
vec!["p79426", "p79427", "p79428", "p79429", "p79430"],
),
(
vec![Filter::match_english(fields["title"], "'rustic bridge'")],
vec!["d05503"],
),
(
vec![
Filter::match_english(fields["title"], "'rustic'"),
Filter::match_english(fields["title"], "study"),
],
vec!["d00399", "d05352"],
),
(
vec![
Filter::has_keywords(fields["artist"], "mauro kunst"),
Filter::has_keyword(fields["artistRole"], "artist"),
Filter::Or,
Filter::eq(fields["year"], 1969u32),
Filter::eq(fields["year"], 1971u32),
Filter::End,
],
vec!["p01764", "t05843"],
),
(
vec![
Filter::Not,
Filter::match_english(fields["medium"], "oil"),
Filter::End,
Filter::match_english(fields["creditLine"], "bequeath"),
Filter::Or,
Filter::And,
Filter::ge(fields["year"], 1900u32),
Filter::lt(fields["year"], 1910u32),
Filter::End,
Filter::And,
Filter::ge(fields["year"], 2000u32),
Filter::lt(fields["year"], 2010u32),
Filter::End,
Filter::End,
],
vec![
"n02478", "n02479", "n03568", "n03658", "n04327", "n04328", "n04721", "n04739",
"n05095", "n05096", "n05145", "n05157", "n05158", "n05159", "n05298", "n05303",
"n06070", "t01181", "t03571", "t05805", "t05806", "t12147", "t12154", "t12155",
],
),
(
vec![
Filter::And,
Filter::has_keyword(fields["artist"], "warhol"),
Filter::Not,
Filter::match_english(fields["title"], "'campbell'"),
Filter::End,
Filter::Not,
Filter::Or,
Filter::gt(fields["year"], 1980u32),
Filter::And,
Filter::gt(fields["width"], 500u32),
Filter::gt(fields["height"], 500u32),
Filter::End,
Filter::End,
Filter::End,
Filter::eq(fields["acquisitionYear"], 2008u32),
Filter::End,
],
vec!["ar00039", "t12600"],
),
(
vec![
Filter::match_english(fields["title"], "study"),
Filter::match_english(fields["medium"], "paper"),
Filter::match_english(fields["creditLine"], "'purchased'"),
Filter::Not,
Filter::match_english(fields["title"], "'anatomical'"),
Filter::match_english(fields["title"], "'for'"),
Filter::End,
Filter::gt(fields["year"], 1900u32),
Filter::gt(fields["acquisitionYear"], 2000u32),
],
vec![
"p80042", "p80043", "p80044", "p80045", "p80203", "t11937", "t12172",
],
),
];
for (filter, expected_results) in tests {
println!("Running test: {:?}", filter);
let mut results: Vec<String> = Vec::with_capacity(expected_results.len());
let docset = db.filter(0, COLLECTION_ID, filter).unwrap();
let sorted_docset = db
.sort(
0,
COLLECTION_ID,
docset,
vec![Comparator::ascending(fields["accession_number"])],
0,
0,
None,
0,
)
.unwrap();
for document_id in sorted_docset.ids {
results.push(
db.get_value(ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id,
field: fields["accession_number"],
})
.unwrap()
.unwrap(),
);
}
assert_eq!(results, expected_results);
}
}
pub fn test_sort(db: &Store) {
let mut fields = AHashMap::default();
for (field_num, field) in FIELDS.iter().enumerate() {
fields.insert(field.to_string(), field_num as u8);
}
let tests = [
(
vec![
Filter::gt(fields["year"], 0u32),
Filter::gt(fields["acquisitionYear"], 0u32),
Filter::gt(fields["width"], 0u32),
],
vec![
Comparator::descending(fields["year"]),
Comparator::ascending(fields["acquisitionYear"]),
Comparator::ascending(fields["width"]),
Comparator::descending(fields["accession_number"]),
],
vec![
"t13655", "t13811", "p13352", "p13351", "p13350", "p13349", "p13348", "p13347",
"p13346", "p13345", "p13344", "p13342", "p13341", "p13340", "p13339", "p13338",
"p13337", "p13336", "p13335", "p13334", "p13333", "p13332", "p13331", "p13330",
"p13329", "p13328", "p13327", "p13326", "p13325", "p13324", "p13323", "t13786",
"p13322", "p13321", "p13320", "p13319", "p13318", "p13317", "p13316", "p13315",
"p13314", "t13588", "t13587", "t13586", "t13585", "t13584", "t13540", "t13444",
"ar01154", "ar01153",
],
),
(
vec![
Filter::gt(fields["width"], 0u32),
Filter::gt(fields["height"], 0u32),
],
vec![
Comparator::descending(fields["width"]),
Comparator::ascending(fields["height"]),
],
vec![
"t03681", "t12601", "ar00166", "t12625", "t12915", "p04182", "t06483", "ar00703",
"t07671", "ar00021", "t05557", "t07918", "p06298", "p05465", "p06640", "t12855",
"t01355", "t12800", "t12557", "t02078",
],
),
(
vec![],
vec![
Comparator::descending(fields["medium"]),
Comparator::descending(fields["artistRole"]),
Comparator::ascending(fields["accession_number"]),
],
vec![
"ar00627", "ar00052", "t00352", "t07275", "t12318", "t04931", "t13683", "t13686",
"t13687", "t13688", "t13689", "t13690", "t13691", "t07766", "t07918", "t12993",
"ar00044", "t13326", "t07614", "t12414",
],
),
];
for (filter, sort, expected_results) in tests {
let mut results: Vec<String> = Vec::with_capacity(expected_results.len());
let docset = db.filter(0, COLLECTION_ID, filter).unwrap();
let sorted_docset = db
.sort(
0,
COLLECTION_ID,
docset,
sort,
expected_results.len(),
0,
None,
0,
)
.unwrap();
for document_id in sorted_docset.ids {
results.push(
db.get_value(ValueKey {
account_id: 0,
collection: COLLECTION_ID,
document_id,
field: fields["accession_number"],
})
.unwrap()
.unwrap(),
);
}
assert_eq!(results, expected_results);
}
}

Binary file not shown.

View file

@ -9,32 +9,45 @@ impl BatchBuilder {
pub fn new() -> Self {
Self {
ops: Vec::new(),
last_account_id: 0,
last_document_id: 0,
last_collection: 0,
}
}
pub fn with_context(
&mut self,
account_id: u32,
document_id: u32,
collection: impl Into<u8>,
) -> &mut Self {
self.last_account_id = account_id;
self.last_document_id = document_id;
self.last_collection = collection.into();
self.push_context();
pub fn with_account_id(&mut self, account_id: u32) -> &mut Self {
self.ops.push(Operation::AccountId { account_id });
self
}
#[inline(always)]
pub(super) fn push_context(&mut self) {
self.ops.push(Operation::WithContext {
account_id: self.last_account_id,
document_id: self.last_document_id,
pub fn with_collection(&mut self, collection: impl Into<u8>) -> &mut Self {
self.last_collection = collection.into();
self.ops.push(Operation::Collection {
collection: self.last_collection,
});
self
}
pub fn create_document(&mut self) -> &mut Self {
self.ops.push(Operation::DocumentId {
document_id: u32::MAX,
set: true,
});
self
}
pub fn update_document(&mut self, document_id: u32) -> &mut Self {
self.ops.push(Operation::DocumentId {
document_id,
set: true,
});
self
}
pub fn delete_document(&mut self, document_id: u32) -> &mut Self {
self.ops.push(Operation::DocumentId {
document_id,
set: false,
});
self
}
pub fn value(
@ -51,7 +64,7 @@ impl BatchBuilder {
self.ops.push(Operation::Bitmap {
family: BM_TERM | TERM_EXACT,
field,
key: token,
key: token.into_bytes(),
set: is_set,
});
}
@ -87,9 +100,9 @@ impl BatchBuilder {
});
}
pub fn acl(&mut self, to_account_id: u32, acl: Option<impl Serialize>) {
pub fn acl(&mut self, grant_account_id: u32, acl: Option<impl Serialize>) {
self.ops.push(Operation::Acl {
to_account_id,
grant_account_id,
set: acl.map(|acl| acl.serialize()),
})
}

View file

@ -66,6 +66,12 @@ impl KeySerialize for u32 {
}
}
impl KeySerialize for u16 {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&self.to_be_bytes());
}
}
impl KeySerialize for u64 {
fn serialize(&self, buf: &mut Vec<u8>) {
buf.extend_from_slice(&self.to_be_bytes());

View file

@ -71,7 +71,7 @@ impl IntoOperations for ChangeLogBuilder {
for (collection, changes) in self.changes {
if collection != batch.last_collection {
batch.last_collection = collection;
batch.push_context();
batch.ops.push(Operation::Collection { collection });
}
batch.ops.push(Operation::Log {

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use crate::Serialize;
use crate::{Deserialize, Serialize};
pub mod batch;
pub mod key;
@ -16,18 +16,21 @@ pub struct Batch {
}
pub struct BatchBuilder {
pub last_account_id: u32,
pub last_document_id: u32,
pub last_collection: u8,
pub ops: Vec<Operation>,
}
pub enum Operation {
WithContext {
AccountId {
account_id: u32,
document_id: u32,
},
Collection {
collection: u8,
},
DocumentId {
document_id: u32,
set: bool,
},
Value {
field: u8,
set: Option<Vec<u8>>,
@ -43,12 +46,17 @@ pub enum Operation {
key: Vec<u8>,
set: bool,
},
Bloom {
field: u8,
family: u8,
set: Option<Vec<u8>>,
},
Blob {
key: Vec<u8>,
set: bool,
},
Acl {
to_account_id: u32,
grant_account_id: u32,
set: Option<Vec<u8>>,
},
Log {
@ -69,6 +77,12 @@ impl Serialize for u64 {
}
}
impl Serialize for u16 {
fn serialize(self) -> Vec<u8> {
self.to_be_bytes().to_vec()
}
}
impl Serialize for f64 {
fn serialize(self) -> Vec<u8> {
self.to_be_bytes().to_vec()
@ -87,6 +101,18 @@ impl Serialize for String {
}
}
impl Serialize for Vec<u8> {
fn serialize(self) -> Vec<u8> {
self
}
}
impl Deserialize for String {
fn deserialize(bytes: &[u8]) -> Option<Self> {
String::from_utf8_lossy(bytes).into_owned().into()
}
}
trait HasFlag {
fn has_flag(&self, flag: u32) -> bool;
}
@ -99,11 +125,11 @@ impl HasFlag for u32 {
}
pub trait Tokenize {
fn tokenize(&self) -> HashSet<Vec<u8>>;
fn tokenize(&self) -> HashSet<String>;
}
impl Tokenize for &str {
fn tokenize(&self) -> HashSet<Vec<u8>> {
fn tokenize(&self) -> HashSet<String> {
let mut tokens = HashSet::new();
let mut token = String::new();
@ -115,35 +141,39 @@ impl Tokenize for &str {
token.push(ch);
}
} else if !token.is_empty() {
tokens.insert(token.into_bytes());
tokens.insert(token);
token = String::new();
}
}
if !token.is_empty() {
tokens.insert(token);
}
tokens
}
}
impl Tokenize for String {
fn tokenize(&self) -> HashSet<Vec<u8>> {
fn tokenize(&self) -> HashSet<String> {
self.as_str().tokenize()
}
}
impl Tokenize for u32 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
fn tokenize(&self) -> HashSet<String> {
unreachable!()
}
}
impl Tokenize for u64 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
fn tokenize(&self) -> HashSet<String> {
unreachable!()
}
}
impl Tokenize for f64 {
fn tokenize(&self) -> HashSet<Vec<u8>> {
fn tokenize(&self) -> HashSet<String> {
unreachable!()
}
}