mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-12-09 21:05:59 +08:00
FTS store bigrams in kv rather than index
This commit is contained in:
parent
7e94a08067
commit
a7acc67cf1
8 changed files with 203 additions and 117 deletions
|
|
@ -227,11 +227,11 @@ impl JMAP {
|
|||
.await
|
||||
.failed("Unable to open blob store"),
|
||||
));*/
|
||||
//let fts_store = FtsStore::Store(store.clone());
|
||||
let fts_store = ElasticSearchStore::open(config)
|
||||
.await
|
||||
.failed("Unable to open FTS store")
|
||||
.into();
|
||||
let fts_store = FtsStore::Store(store.clone());
|
||||
/*let fts_store = ElasticSearchStore::open(config)
|
||||
.await
|
||||
.failed("Unable to open FTS store")
|
||||
.into();*/
|
||||
|
||||
let jmap_server = Arc::new(JMAP {
|
||||
directory: directory_config
|
||||
|
|
|
|||
|
|
@ -1,3 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2023 Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use std::{borrow::Cow, fmt::Display};
|
||||
|
||||
use elasticsearch::{DeleteByQueryParts, IndexParts};
|
||||
|
|
|
|||
|
|
@ -1,3 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2023 Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use elasticsearch::{
|
||||
auth::Credentials,
|
||||
cert::CertificateValidation,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,26 @@
|
|||
/*
|
||||
* Copyright (c) 2023 Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use std::{borrow::Cow, fmt::Display};
|
||||
|
||||
use elasticsearch::SearchParts;
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@
|
|||
* for more details.
|
||||
*/
|
||||
|
||||
use std::{borrow::Cow, fmt::Display};
|
||||
use std::{borrow::Cow, collections::BTreeSet, fmt::Display};
|
||||
|
||||
use ahash::{AHashMap, AHashSet};
|
||||
use nlp::{
|
||||
|
|
@ -32,6 +32,7 @@ use nlp::{
|
|||
},
|
||||
tokenizers::word::WordTokenizer,
|
||||
};
|
||||
use utils::codec::leb128::Leb128Reader;
|
||||
|
||||
use crate::{
|
||||
backend::MAX_TOKEN_LENGTH,
|
||||
|
|
@ -170,6 +171,7 @@ impl Store {
|
|||
let default_language = detect
|
||||
.most_frequent_language()
|
||||
.unwrap_or(document.default_language);
|
||||
let mut bigrams = BTreeSet::new();
|
||||
|
||||
for (field, language, text) in parts.into_iter() {
|
||||
let language = if language != Language::Unknown {
|
||||
|
|
@ -182,10 +184,7 @@ impl Store {
|
|||
let mut last_token = Cow::Borrowed("");
|
||||
for token in Stemmer::new(&text, language, MAX_TOKEN_LENGTH) {
|
||||
if !last_token.is_empty() {
|
||||
tokens
|
||||
.entry(BitmapHash::new(&format!("{} {}", last_token, token.word)))
|
||||
.or_default()
|
||||
.insert(TokenType::bigram(field));
|
||||
bigrams.insert(BitmapHash::new(&format!("{} {}", last_token, token.word)).hash);
|
||||
}
|
||||
|
||||
tokens
|
||||
|
|
@ -212,6 +211,13 @@ impl Store {
|
|||
let mut serializer = KeySerializer::new(tokens.len() * U64_LEN * 2);
|
||||
let mut keys = Vec::with_capacity(tokens.len());
|
||||
|
||||
// Write bigrams
|
||||
serializer = serializer.write_leb128(bigrams.len());
|
||||
for bigram in bigrams {
|
||||
serializer = serializer.write(bigram.as_slice());
|
||||
}
|
||||
|
||||
// Write index keys
|
||||
for (hash, fields) in tokens.into_iter() {
|
||||
serializer = serializer
|
||||
.write(hash.hash.as_slice())
|
||||
|
|
@ -336,7 +342,21 @@ impl Deserialize for TermIndex {
|
|||
let bytes = lz4_flex::decompress_size_prepended(bytes)
|
||||
.map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?;
|
||||
let mut ops = Vec::new();
|
||||
let mut bytes = bytes.iter().peekable();
|
||||
|
||||
// Skip bigrams
|
||||
let (num_items, pos) =
|
||||
bytes
|
||||
.as_slice()
|
||||
.read_leb128::<usize>()
|
||||
.ok_or(Error::InternalError(
|
||||
"Failed to read term index marker".to_string(),
|
||||
))?;
|
||||
|
||||
let mut bytes = bytes
|
||||
.get(pos + (num_items * 8)..)
|
||||
.unwrap_or_default()
|
||||
.iter()
|
||||
.peekable();
|
||||
|
||||
while bytes.peek().is_some() {
|
||||
let mut hash = BitmapHash {
|
||||
|
|
|
|||
|
|
@ -22,20 +22,32 @@
|
|||
*/
|
||||
|
||||
use std::{
|
||||
borrow::Cow,
|
||||
fmt::Display,
|
||||
ops::{BitAndAssign, BitOrAssign, BitXorAssign},
|
||||
};
|
||||
|
||||
use ahash::AHashSet;
|
||||
use nlp::language::stemmer::Stemmer;
|
||||
use roaring::RoaringBitmap;
|
||||
use utils::codec::leb128::Leb128Reader;
|
||||
|
||||
use crate::{backend::MAX_TOKEN_LENGTH, fts::FtsFilter, write::BitmapClass, BitmapKey, Store};
|
||||
use crate::{
|
||||
backend::MAX_TOKEN_LENGTH,
|
||||
fts::FtsFilter,
|
||||
write::{BitmapClass, BitmapHash, ValueClass},
|
||||
BitmapKey, Deserialize, Error, Store, ValueKey,
|
||||
};
|
||||
|
||||
struct State<T: Into<u8> + Display + Clone + std::fmt::Debug> {
|
||||
pub op: FtsFilter<T>,
|
||||
pub bm: Option<RoaringBitmap>,
|
||||
}
|
||||
|
||||
struct BigramIndex {
|
||||
grams: Vec<[u8; 8]>,
|
||||
}
|
||||
|
||||
impl Store {
|
||||
pub async fn fts_query<T: Into<u8> + Display + Clone + std::fmt::Debug>(
|
||||
&self,
|
||||
|
|
@ -59,34 +71,60 @@ impl Store {
|
|||
language,
|
||||
} => {
|
||||
let field: u8 = field.clone().into();
|
||||
let mut keys = Vec::new();
|
||||
let mut bigrams = AHashSet::new();
|
||||
let mut last_token = Cow::Borrowed("");
|
||||
for token in language.tokenize_text(text.as_ref(), MAX_TOKEN_LENGTH) {
|
||||
keys.push(BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class: BitmapClass::word(token.word.as_ref(), field),
|
||||
block_num: 0,
|
||||
});
|
||||
|
||||
let tokens = language
|
||||
.tokenize_text(text.as_ref(), MAX_TOKEN_LENGTH)
|
||||
.map(|t| t.word)
|
||||
.collect::<Vec<_>>();
|
||||
let keys = if tokens.len() > 1 {
|
||||
tokens
|
||||
.windows(2)
|
||||
.map(|bg| BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class: BitmapClass::bigram(format!("{} {}", bg[0], bg[1]), field),
|
||||
block_num: 0,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
} else {
|
||||
tokens
|
||||
.into_iter()
|
||||
.map(|word| BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class: BitmapClass::word(word.as_ref(), field),
|
||||
block_num: 0,
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
if !last_token.is_empty() {
|
||||
bigrams.insert(
|
||||
BitmapHash::new(&format!("{} {}", last_token, token.word)).hash,
|
||||
);
|
||||
}
|
||||
|
||||
self.get_bitmaps_intersection(keys).await?
|
||||
last_token = token.word;
|
||||
}
|
||||
|
||||
match keys.len().cmp(&1) {
|
||||
std::cmp::Ordering::Less => None,
|
||||
std::cmp::Ordering::Equal => self.get_bitmaps_intersection(keys).await?,
|
||||
std::cmp::Ordering::Greater => {
|
||||
if let Some(document_ids) = self.get_bitmaps_intersection(keys).await? {
|
||||
let mut results = RoaringBitmap::new();
|
||||
for document_id in document_ids {
|
||||
if let Some(bigram_index) = self
|
||||
.get_value::<BigramIndex>(ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class: ValueClass::TermIndex,
|
||||
})
|
||||
.await?
|
||||
{
|
||||
if bigrams.iter().all(|bigram| {
|
||||
bigram_index.grams.binary_search(bigram).is_ok()
|
||||
}) {
|
||||
results.insert(document_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !results.is_empty() {
|
||||
Some(results)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
FtsFilter::Contains {
|
||||
field,
|
||||
|
|
@ -238,6 +276,27 @@ impl Store {
|
|||
}
|
||||
}
|
||||
|
||||
impl Deserialize for BigramIndex {
|
||||
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
|
||||
let bytes = lz4_flex::decompress_size_prepended(bytes)
|
||||
.map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?;
|
||||
|
||||
let (num_items, pos) = bytes.read_leb128::<usize>().ok_or(Error::InternalError(
|
||||
"Failed to read term index marker".to_string(),
|
||||
))?;
|
||||
|
||||
bytes
|
||||
.get(pos..pos + (num_items * 8))
|
||||
.map(|bytes| Self {
|
||||
grams: bytes
|
||||
.chunks_exact(8)
|
||||
.map(|chunk| chunk.try_into().unwrap())
|
||||
.collect(),
|
||||
})
|
||||
.ok_or_else(|| Error::InternalError("Failed to read term index".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Into<u8> + Display + Clone + std::fmt::Debug> From<FtsFilter<T>> for State<T> {
|
||||
fn from(value: FtsFilter<T>) -> Self {
|
||||
Self {
|
||||
|
|
|
|||
|
|
@ -34,18 +34,18 @@ impl BitmapClass {
|
|||
}
|
||||
|
||||
pub fn stemmed(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
|
||||
BitmapClass::Text {
|
||||
field: field.into() | 1 << 6,
|
||||
token: BitmapHash::new(token),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn bigram(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
|
||||
BitmapClass::Text {
|
||||
field: field.into() | 1 << 7,
|
||||
token: BitmapHash::new(token),
|
||||
}
|
||||
}
|
||||
|
||||
/*pub fn bigram(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
|
||||
BitmapClass::Text {
|
||||
field: field.into() | 1 << 7,
|
||||
token: BitmapHash::new(token),
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
impl BitmapHash {
|
||||
|
|
@ -84,75 +84,10 @@ impl TokenType {
|
|||
}
|
||||
|
||||
pub fn stemmed(field: u8) -> u8 {
|
||||
1 << 6 | field
|
||||
}
|
||||
|
||||
pub fn bigram(field: u8) -> u8 {
|
||||
1 << 7 | field
|
||||
}
|
||||
|
||||
/*pub fn bigram(field: u8) -> u8 {
|
||||
1 << 7 | field
|
||||
}*/
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
const AHASHER: ahash::RandomState = ahash::RandomState::with_seeds(
|
||||
0xaf1f2242106c64b3,
|
||||
0x60ca4cfb4b3ed0ce,
|
||||
0xc7dbc0bb615e82b3,
|
||||
0x520ad065378daf88,
|
||||
);
|
||||
lazy_static::lazy_static! {
|
||||
static ref SIPHASHER: siphasher::sip::SipHasher13 =
|
||||
siphasher::sip::SipHasher13::new_with_keys(0x56205cbdba8f02a6, 0xbd0dbc4bb06d687b);
|
||||
}
|
||||
|
||||
let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
|
||||
let h2 = farmhash::hash64(item).to_le_bytes();
|
||||
let h3 = AHASHER.hash_one(item).to_le_bytes();
|
||||
let mut sh = *SIPHASHER;
|
||||
sh.write(item.as_ref());
|
||||
let h4 = sh.finish().to_le_bytes();
|
||||
|
||||
result[..2].copy_from_slice(&h1[..2]);
|
||||
result[2..4].copy_from_slice(&h2[..2]);
|
||||
result[4..6].copy_from_slice(&h3[..2]);
|
||||
result[6..8].copy_from_slice(&h4[..2]);
|
||||
|
||||
impl KeySerializer {
|
||||
pub fn hash_text(mut self, item: impl AsRef<[u8]>) -> Self {
|
||||
let item = item.as_ref();
|
||||
|
||||
if item.len() <= 8 {
|
||||
self.buf.extend_from_slice(item);
|
||||
} else {
|
||||
let h1 = xxhash_rust::xxh3::xxh3_64(item).to_le_bytes();
|
||||
let h2 = farmhash::hash64(item).to_le_bytes();
|
||||
let h3 = AHASHER.hash_one(item).to_le_bytes();
|
||||
let mut sh = *SIPHASHER;
|
||||
sh.write(item.as_ref());
|
||||
let h4 = sh.finish().to_le_bytes();
|
||||
|
||||
match item.len() {
|
||||
9..=16 => {
|
||||
self.buf.extend_from_slice(&h1[..2]);
|
||||
self.buf.extend_from_slice(&h2[..2]);
|
||||
self.buf.extend_from_slice(&h3[..2]);
|
||||
self.buf.extend_from_slice(&h4[..2]);
|
||||
}
|
||||
17..=32 => {
|
||||
self.buf.extend_from_slice(&h1[..3]);
|
||||
self.buf.extend_from_slice(&h2[..3]);
|
||||
self.buf.extend_from_slice(&h3[..3]);
|
||||
self.buf.extend_from_slice(&h4[..3]);
|
||||
}
|
||||
_ => {
|
||||
self.buf.extend_from_slice(&h1[..4]);
|
||||
self.buf.extend_from_slice(&h2[..4]);
|
||||
self.buf.extend_from_slice(&h3[..4]);
|
||||
self.buf.extend_from_slice(&h4[..4]);
|
||||
}
|
||||
}
|
||||
}
|
||||
self
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -29,7 +29,10 @@ use std::io::Read;
|
|||
|
||||
use ::store::Store;
|
||||
|
||||
use store::backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore};
|
||||
use store::{
|
||||
backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore},
|
||||
FtsStore,
|
||||
};
|
||||
use utils::config::Config;
|
||||
|
||||
pub struct TempDir {
|
||||
|
|
@ -72,8 +75,8 @@ pub async fn store_tests() {
|
|||
//let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap())
|
||||
//let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap())
|
||||
let db: Store = RocksDbStore::open(&config).await.unwrap().into();
|
||||
//let fts_store = FtsStore::from(db.clone());
|
||||
let fts_store = ElasticSearchStore::open(&config).await.unwrap().into();
|
||||
let fts_store = FtsStore::from(db.clone());
|
||||
//let fts_store = ElasticSearchStore::open(&config).await.unwrap().into();
|
||||
|
||||
if insert {
|
||||
db.destroy().await;
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue