ElasticSearch backend implementation

This commit is contained in:
mdecimus 2023-12-03 19:40:16 +01:00
parent 2ccf85d6dd
commit 7e94a08067
20 changed files with 693 additions and 66 deletions

165
Cargo.lock generated
View file

@ -249,6 +249,19 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-compression"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-recursion"
version = "1.0.5"
@ -416,6 +429,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf"
[[package]]
name = "base64"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
[[package]]
name = "base64"
version = "0.13.1"
@ -1128,9 +1147,9 @@ dependencies = [
[[package]]
name = "crypto-mac"
version = "0.10.1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a"
checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
dependencies = [
"generic-array",
"subtle",
@ -1178,7 +1197,7 @@ dependencies = [
"digest 0.10.7",
"fiat-crypto",
"platforms",
"rustc_version",
"rustc_version 0.4.0",
"subtle",
"zeroize",
]
@ -1194,14 +1213,38 @@ dependencies = [
"syn 2.0.39",
]
[[package]]
name = "darling"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c"
dependencies = [
"darling_core 0.13.4",
"darling_macro 0.13.4",
]
[[package]]
name = "darling"
version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e"
dependencies = [
"darling_core",
"darling_macro",
"darling_core 0.20.3",
"darling_macro 0.20.3",
]
[[package]]
name = "darling_core"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn 1.0.109",
]
[[package]]
@ -1218,13 +1261,24 @@ dependencies = [
"syn 2.0.39",
]
[[package]]
name = "darling_macro"
version = "0.13.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835"
dependencies = [
"darling_core 0.13.4",
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling_macro"
version = "0.20.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
dependencies = [
"darling_core",
"darling_core 0.20.3",
"quote",
"syn 2.0.39",
]
@ -1323,9 +1377,9 @@ dependencies = [
[[package]]
name = "deranged"
version = "0.3.9"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3"
checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc"
dependencies = [
"powerfmt",
"serde",
@ -1595,6 +1649,26 @@ dependencies = [
"serde",
]
[[package]]
name = "elasticsearch"
version = "8.5.0-alpha.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6"
dependencies = [
"base64 0.11.0",
"bytes",
"dyn-clone",
"lazy_static",
"percent-encoding",
"reqwest",
"rustc_version 0.2.3",
"serde",
"serde_json",
"serde_with",
"url",
"void",
]
[[package]]
name = "elliptic-curve"
version = "0.13.8"
@ -2554,7 +2628,7 @@ dependencies = [
"parking_lot",
"rand",
"rustls 0.21.9",
"rustls-pemfile 1.0.4",
"rustls-pemfile 2.0.0",
"store",
"tokio",
"tokio-rustls",
@ -3160,7 +3234,7 @@ dependencies = [
"md5",
"parking_lot",
"rustls 0.21.9",
"rustls-pemfile 1.0.4",
"rustls-pemfile 2.0.0",
"sieve-rs",
"store",
"tokio",
@ -3313,7 +3387,7 @@ version = "0.30.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f"
dependencies = [
"darling",
"darling 0.20.3",
"heck",
"num-bigint",
"proc-macro-crate 1.3.1",
@ -4469,6 +4543,7 @@ version = "0.11.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b"
dependencies = [
"async-compression",
"base64 0.21.5",
"bytes",
"encoding_rs",
@ -4632,9 +4707,9 @@ dependencies = [
[[package]]
name = "rsa"
version = "0.9.5"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af6c4b23d99685a1408194da11270ef8e9809aff951cc70ec9b17350b087e474"
checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc"
dependencies = [
"const-oid",
"digest 0.10.7",
@ -4754,13 +4829,22 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver 0.9.0",
]
[[package]]
name = "rustc_version"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
dependencies = [
"semver",
"semver 1.0.20",
]
[[package]]
@ -4972,12 +5056,27 @@ dependencies = [
"libc",
]
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver"
version = "1.0.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090"
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "sequoia-openpgp"
version = "1.17.0"
@ -5084,6 +5183,28 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_with"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff"
dependencies = [
"serde",
"serde_with_macros",
]
[[package]]
name = "serde_with_macros"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082"
dependencies = [
"darling 0.13.4",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "serial_test"
version = "2.0.0"
@ -5286,7 +5407,7 @@ dependencies = [
"regex",
"reqwest",
"rustls 0.21.9",
"rustls-pemfile 1.0.4",
"rustls-pemfile 2.0.0",
"serde",
"serde_json",
"sha1",
@ -5299,7 +5420,7 @@ dependencies = [
"tracing",
"unicode-security",
"utils",
"webpki-roots 0.25.3",
"webpki-roots 0.26.0",
"whatlang",
"x509-parser",
]
@ -5641,6 +5762,7 @@ dependencies = [
"ahash 0.8.6",
"blake3",
"deadpool-postgres",
"elasticsearch",
"farmhash",
"foundationdb",
"futures",
@ -5661,6 +5783,7 @@ dependencies = [
"rust-s3",
"rustls 0.21.9",
"serde",
"serde_json",
"tokio",
"tokio-postgres",
"tokio-rustls",
@ -5711,9 +5834,9 @@ dependencies = [
[[package]]
name = "subtle"
version = "2.4.1"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc"
[[package]]
name = "syn"
@ -6537,6 +6660,12 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "void"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "want"
version = "0.3.1"

View file

@ -15,7 +15,7 @@ utils = { path = "../utils" }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "ludicrous_mode"] }
mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] }
rustls = "0.21.0"
rustls-pemfile = "1.0"
rustls-pemfile = "2.0"
tokio = { version = "1.23", features = ["full"] }
tokio-rustls = { version = "0.24.0"}
parking_lot = "0.12"

View file

@ -48,7 +48,7 @@ use services::{
};
use smtp::core::SMTP;
use store::{
backend::{rocksdb::RocksDbStore, sqlite::SqliteStore},
backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore},
fts::FtsFilter,
parking_lot::Mutex,
query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet},
@ -201,11 +201,11 @@ impl JMAP {
.await
.failed("Unable to open database"),
));*/
let store = Store::SQLite(Arc::new(
/*let store = Store::SQLite(Arc::new(
SqliteStore::open(config)
.await
.failed("Unable to open database"),
));
));*/
/*let store = Store::FoundationDb(Arc::new(
FdbStore::open(config)
.await
@ -216,17 +216,22 @@ impl JMAP {
.await
.failed("Unable to open database"),
));*/
/*let store = Store::RocksDb(Arc::new(
let store = Store::RocksDb(Arc::new(
RocksDbStore::open(config)
.await
.failed("Unable to open database"),
));*/
));
let blob_store = store.clone().into();
/*let blob_store = BlobStore::Fs(Arc::new(
FsStore::open(config)
.await
.failed("Unable to open blob store"),
));*/
//let fts_store = FtsStore::Store(store.clone());
let fts_store = ElasticSearchStore::open(config)
.await
.failed("Unable to open FTS store")
.into();
let jmap_server = Arc::new(JMAP {
directory: directory_config
@ -241,7 +246,7 @@ impl JMAP {
.property::<u64>("global.node-id")?
.map(SnowflakeIdGenerator::with_node_id)
.unwrap_or_else(SnowflakeIdGenerator::new),
fts_store: FtsStore::Store(store.clone()),
fts_store,
store,
blob_store,
config: Config::new(config).failed("Invalid configuration file"),

View file

@ -31,10 +31,12 @@ tracing = "0.1"
jemallocator = "0.5.0"
[features]
default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks"]
#default = []
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
default = ["rocks", "elastic"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
mysql = ["store/mysql"]
rocks = ["store/rocks"]
elastic = ["store/elastic"]
s3 = ["store/s3"]

View file

@ -16,7 +16,7 @@ mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features =
mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] }
sieve-rs = { git = "https://github.com/stalwartlabs/sieve" }
rustls = "0.21.0"
rustls-pemfile = "1.0"
rustls-pemfile = "2.0"
tokio = { version = "1.23", features = ["full"] }
tokio-rustls = { version = "0.24.0"}
parking_lot = "0.12"

View file

@ -23,10 +23,10 @@ smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
sieve-rs = { git = "https://github.com/stalwartlabs/sieve" }
ahash = { version = "0.8" }
rustls = "0.21.0"
rustls-pemfile = "1.0"
rustls-pemfile = "2.0"
tokio = { version = "1.23", features = ["full"] }
tokio-rustls = { version = "0.24.0"}
webpki-roots = { version = "0.25"}
webpki-roots = { version = "0.26"}
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
http-body-util = "0.1.0"

View file

@ -10,7 +10,7 @@ nlp = { path = "../nlp" }
rocksdb = { version = "0.21", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true }
rusqlite = { version = "0.29.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"], optional = true }
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }
r2d2 = { version = "0.8.10", optional = true }
futures = { version = "0.3", optional = true }
@ -34,6 +34,8 @@ tokio-rustls = { version = "0.24.0", optional = true }
rustls = { version = "0.21.0", optional = true }
ring = { version = "0.17", optional = true }
mysql_async = { version = "0.33", default-features = false, features = ["default-rustls"], optional = true }
elasticsearch = { version = "8.5.0-alpha.1", default-features = false, features = ["rustls-tls"], optional = true }
serde_json = {version = "1.0.64", optional = true }
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
@ -42,7 +44,9 @@ tokio = { version = "1.23", features = ["full"] }
rocks = ["rocksdb", "rayon", "num_cpus"]
sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "lru-cache"]
postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "futures"]
elastic = ["elasticsearch", "serde_json"]
mysql = ["mysql_async"]
s3 = ["rust-s3"]
foundation = ["foundationdb", "futures"]
test_mode = []

View file

@ -0,0 +1,140 @@
use std::{borrow::Cow, fmt::Display};
use elasticsearch::{DeleteByQueryParts, IndexParts};
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::{
backend::elastic::INDEX_NAMES,
fts::{index::FtsDocument, Field},
};
use super::ElasticSearchStore;
#[derive(Serialize, Deserialize, Default)]
struct Document<'x> {
document_id: u32,
account_id: u32,
body: Vec<Cow<'x, str>>,
attachments: Vec<Cow<'x, str>>,
keywords: Vec<Cow<'x, str>>,
header: Vec<Header<'x>>,
}
#[derive(Serialize, Deserialize)]
struct Header<'x> {
name: Cow<'x, str>,
value: Cow<'x, str>,
}
impl ElasticSearchStore {
pub async fn fts_index<T: Into<u8> + Display + Clone + std::fmt::Debug>(
&self,
document: FtsDocument<'_, T>,
) -> crate::Result<()> {
self.index
.index(IndexParts::Index(INDEX_NAMES[document.collection as usize]))
.body(Document::from(document))
.send()
.await
.map_err(Into::into)
.and_then(|response| {
if response.status_code().is_success() {
Ok(())
} else {
Err(crate::Error::InternalError(format!(
"Failed to index document: {:?}",
response
)))
}
})
}
pub async fn fts_remove(
&self,
account_id: u32,
collection: u8,
document_id: u32,
) -> crate::Result<bool> {
self.index
.delete_by_query(DeleteByQueryParts::Index(&[
INDEX_NAMES[collection as usize]
]))
.body(json!({
"query": {
"bool": {
"must": [
{ "match": { "account_id": account_id } },
{ "match": { "document_id": document_id } }
]
}
}
}))
.send()
.await
.map_err(Into::into)
.and_then(|response| {
if response.status_code().is_success() {
Ok(true)
} else {
Err(crate::Error::InternalError(format!(
"Failed to remove document: {:?}",
response
)))
}
})
}
pub async fn fts_remove_all(&self, account_id: u32) -> crate::Result<()> {
self.index
.delete_by_query(DeleteByQueryParts::Index(INDEX_NAMES))
.body(json!({
"query": {
"bool": {
"must": [
{ "match": { "account_id": account_id } },
]
}
}
}))
.send()
.await
.map_err(Into::into)
.and_then(|response| {
if response.status_code().is_success() {
Ok(())
} else {
Err(crate::Error::InternalError(format!(
"Failed to remove document: {:?}",
response
)))
}
})
}
}
impl<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> From<FtsDocument<'x, T>>
for Document<'x>
{
fn from(value: FtsDocument<'x, T>) -> Self {
let mut document = Document {
account_id: value.account_id,
document_id: value.document_id,
..Default::default()
};
for part in value.parts {
match part.field {
Field::Header(name) => document.header.push(Header {
name: name.to_string().into(),
value: part.text,
}),
Field::Body => document.body.push(part.text),
Field::Attachment => document.attachments.push(part.text),
Field::Keyword => document.keywords.push(part.text),
}
}
document
}
}

View file

@ -0,0 +1,160 @@
use elasticsearch::{
auth::Credentials,
cert::CertificateValidation,
http::{
transport::{BuildError, SingleNodeConnectionPool, Transport, TransportBuilder},
StatusCode, Url,
},
indices::{IndicesCreateParts, IndicesExistsParts},
Elasticsearch, Error,
};
use serde_json::json;
use utils::config::Config;
pub mod index;
pub mod query;
pub struct ElasticSearchStore {
index: Elasticsearch,
}
pub(crate) static INDEX_NAMES: &[&str] = &["stalwart_email"];
impl ElasticSearchStore {
pub async fn open(config: &Config) -> crate::Result<Self> {
let credentials = if let Some(user) = config.value("store.fts.user") {
let password = config.value_require("store.fts.password")?;
Some(Credentials::Basic(user.to_string(), password.to_string()))
} else {
None
};
let es = if let Some(url) = config.value("store.fts.url") {
let url = Url::parse(url).map_err(|e| {
crate::Error::InternalError(format!("Invalid store.fts.url: {}", e))
})?;
let conn_pool = SingleNodeConnectionPool::new(url);
let mut builder = TransportBuilder::new(conn_pool);
if let Some(credentials) = credentials {
builder = builder.auth(credentials);
}
if config.property_or_static::<bool>("store.fts.allow-invalid-certs", "false")? {
builder = builder.cert_validation(CertificateValidation::None);
}
Self {
index: Elasticsearch::new(builder.build()?),
}
} else if let Some(cloud_id) = config.value("store.fts.cloud-id") {
Self {
index: Elasticsearch::new(Transport::cloud(
cloud_id,
credentials.ok_or_else(|| {
crate::Error::InternalError(
"Missing store.fts.user or store.fts.password".to_string(),
)
})?,
)?),
}
} else {
return Err(crate::Error::InternalError(
"Missing store.fts.url or store.fts.cloud_id".to_string(),
));
};
es.create_index(
config.property_or_static("store.fts.shards", "3")?,
config.property_or_static("store.fts.replicas", "0")?,
)
.await?;
Ok(es)
}
async fn create_index(&self, shards: usize, replicas: usize) -> crate::Result<()> {
let exists = self
.index
.indices()
.exists(IndicesExistsParts::Index(&[INDEX_NAMES[0]]))
.send()
.await?;
if exists.status_code() == StatusCode::NOT_FOUND {
let response = self
.index
.indices()
.create(IndicesCreateParts::Index(INDEX_NAMES[0]))
.body(json!({
"mappings": {
"properties": {
"document_id": {
"type": "integer"
},
"account_id": {
"type": "integer"
},
"header": {
"type": "object",
"properties": {
"name": {
"type": "keyword"
},
"value": {
"type": "text",
"analyzer": "default_analyzer",
}
}
},
"body": {
"analyzer": "default_analyzer",
"type": "text"
},
"attachment": {
"analyzer": "default_analyzer",
"type": "text"
},
"keyword": {
"type": "keyword"
}
}
},
"settings": {
"index.number_of_shards": shards,
"index.number_of_replicas": replicas,
"analysis": {
"analyzer": {
"default_analyzer": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase"]
}
}
}
}
}))
.send()
.await?;
if !response.status_code().is_success() {
return Err(crate::Error::InternalError(format!(
"Error while creating ElastiSearch index: {:?}",
response
)));
}
}
Ok(())
}
}
impl From<Error> for crate::Error {
fn from(value: Error) -> Self {
crate::Error::InternalError(format!("Elasticsearch error: {}", value))
}
}
impl From<BuildError> for crate::Error {
fn from(value: BuildError) -> Self {
crate::Error::InternalError(format!("Elasticsearch build error: {}", value))
}
}

View file

@ -0,0 +1,124 @@
use std::{borrow::Cow, fmt::Display};
use elasticsearch::SearchParts;
use roaring::RoaringBitmap;
use serde_json::{json, Value};
use crate::fts::{Field, FtsFilter};
use super::{ElasticSearchStore, INDEX_NAMES};
impl ElasticSearchStore {
pub async fn fts_query<T: Into<u8> + Display + Clone + std::fmt::Debug>(
&self,
account_id: u32,
collection: impl Into<u8>,
filters: Vec<FtsFilter<T>>,
) -> crate::Result<RoaringBitmap> {
let mut stack: Vec<(FtsFilter<T>, Vec<Value>)> = vec![];
let mut conditions = vec![json!({ "match": { "account_id": account_id } })];
let mut logical_op = FtsFilter::And;
for filter in filters {
let is_exact = matches!(filter, FtsFilter::Exact { .. });
match filter {
FtsFilter::Exact { field, text, .. }
| FtsFilter::Contains { field, text, .. }
| FtsFilter::Keyword { field, text, .. } => {
let match_type = if is_exact { "term" } else { "match" };
if let Field::Header(name) = field {
conditions.push(json!({"bool": {
"must": [
{
"term": {
"header.name": name.to_string()
}
},
{
match_type: {
"header.value": text
}
}
]
}}));
} else {
conditions.push(json!({
match_type: { field.name(): text }
}));
}
}
FtsFilter::And | FtsFilter::Or | FtsFilter::Not => {
stack.push((logical_op, conditions));
logical_op = filter;
conditions = Vec::new();
}
FtsFilter::End => {
if let Some((prev_logical_op, mut prev_conditions)) = stack.pop() {
if !conditions.is_empty() {
match logical_op {
FtsFilter::And => {
prev_conditions.push(json!({ "bool": { "must": conditions } }));
}
FtsFilter::Or => {
prev_conditions
.push(json!({ "bool": { "should": conditions } }));
}
FtsFilter::Not => {
prev_conditions
.push(json!({ "bool": { "must_not": conditions } }));
}
_ => unreachable!(),
}
}
logical_op = prev_logical_op;
conditions = prev_conditions;
}
}
}
}
// TODO implement pagination
let response = self
.index
.search(SearchParts::Index(&[
INDEX_NAMES[collection.into() as usize]
]))
.body(json!({
"query": {
"bool": {
"must": conditions,
}
},
"size": 10000,
"_source": ["document_id"]
}))
.send()
.await?
.error_for_status_code()?;
let json: Value = response.json().await?;
let mut results = RoaringBitmap::new();
for hit in json["hits"]["hits"].as_array().ok_or_else(|| {
crate::Error::InternalError("Invalid response from ElasticSearch".to_string())
})? {
results.insert(hit["_source"]["document_id"].as_u64().ok_or_else(|| {
crate::Error::InternalError("Invalid response from ElasticSearch".to_string())
})? as u32);
}
Ok(results)
}
}
impl<T: Into<u8> + Display + Clone + std::fmt::Debug> Field<T> {
pub fn name(&self) -> Cow<'static, str> {
match self {
Field::Header(name) => format!("header.{name}").into(),
Field::Body => "body".into(),
Field::Attachment => "attachment".into(),
Field::Keyword => "keyword".into(),
}
}
}

View file

@ -21,20 +21,22 @@
* for more details.
*/
#[cfg(feature = "elastic")]
pub mod elastic;
#[cfg(feature = "foundation")]
pub mod foundationdb;
pub mod fs;
#[cfg(feature = "mysql")]
pub mod mysql;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "rocks")]
pub mod rocksdb;
#[cfg(feature = "s3")]
pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub mod fs;
pub mod s3;
pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize;
pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1;

View file

@ -354,6 +354,7 @@ impl BlobStore {
pub async fn get_blob(&self, key: &[u8], range: Range<u32>) -> crate::Result<Option<Vec<u8>>> {
match self {
Self::Fs(store) => store.get_blob(key, range).await,
#[cfg(feature = "s3")]
Self::S3(store) => store.get_blob(key, range).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.get_blob(key, range).await,
@ -371,6 +372,7 @@ impl BlobStore {
pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> {
match self {
Self::Fs(store) => store.put_blob(key, data).await,
#[cfg(feature = "s3")]
Self::S3(store) => store.put_blob(key, data).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.put_blob(key, data).await,
@ -388,6 +390,7 @@ impl BlobStore {
pub async fn delete_blob(&self, key: &[u8]) -> crate::Result<bool> {
match self {
Self::Fs(store) => store.delete_blob(key).await,
#[cfg(feature = "s3")]
Self::S3(store) => store.delete_blob(key).await,
#[cfg(feature = "sqlite")]
Self::Sqlite(store) => store.delete_blob(key).await,
@ -410,6 +413,8 @@ impl FtsStore {
) -> crate::Result<()> {
match self {
FtsStore::Store(store) => store.fts_index(document).await,
#[cfg(feature = "elastic")]
FtsStore::ElasticSearch(store) => store.fts_index(document).await,
}
}
@ -421,6 +426,10 @@ impl FtsStore {
) -> crate::Result<RoaringBitmap> {
match self {
FtsStore::Store(store) => store.fts_query(account_id, collection, filters).await,
#[cfg(feature = "elastic")]
FtsStore::ElasticSearch(store) => {
store.fts_query(account_id, collection, filters).await
}
}
}
@ -432,12 +441,18 @@ impl FtsStore {
) -> crate::Result<bool> {
match self {
FtsStore::Store(store) => store.fts_remove(account_id, collection, document_id).await,
#[cfg(feature = "elastic")]
FtsStore::ElasticSearch(store) => {
store.fts_remove(account_id, collection, document_id).await
}
}
}
pub async fn remove_all(&self, account_id: u32) -> crate::Result<()> {
match self {
FtsStore::Store(store) => store.fts_remove_all(account_id).await,
#[cfg(feature = "elastic")]
FtsStore::ElasticSearch(store) => store.fts_remove_all(account_id).await,
}
}
}

View file

@ -45,14 +45,14 @@ use crate::{
use super::Field;
#[derive(Debug)]
struct Text<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> {
field: Field<T>,
text: Cow<'x, str>,
typ: Type,
pub(crate) struct Text<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> {
pub field: Field<T>,
pub text: Cow<'x, str>,
pub typ: Type,
}
#[derive(Debug)]
enum Type {
pub(crate) enum Type {
Text(Language),
Tokenize,
Keyword,
@ -60,11 +60,11 @@ enum Type {
#[derive(Debug)]
pub struct FtsDocument<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> {
parts: Vec<Text<'x, T>>,
default_language: Language,
account_id: u32,
collection: u8,
document_id: u32,
pub(crate) parts: Vec<Text<'x, T>>,
pub(crate) default_language: Language,
pub(crate) account_id: u32,
pub(crate) collection: u8,
pub(crate) document_id: u32,
}
impl<'x, T: Into<u8> + Display + Clone + std::fmt::Debug> FtsDocument<'x, T> {

View file

@ -28,7 +28,7 @@ use nlp::language::Language;
pub mod index;
pub mod query;
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Field<T: Into<u8> + Display + Clone + std::fmt::Debug> {
Header(T),
Body,
@ -36,7 +36,7 @@ pub enum Field<T: Into<u8> + Display + Clone + std::fmt::Debug> {
Keyword,
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq)]
pub enum FtsFilter<T: Into<u8> + Display + Clone + std::fmt::Debug> {
Exact {
field: Field<T>,
@ -70,12 +70,20 @@ impl<T: Into<u8> + Display + Clone + std::fmt::Debug> FtsFilter<T> {
pub fn has_text(field: Field<T>, text: impl Into<String>, language: Language) -> Self {
let text = text.into();
if !matches!(language, Language::None) && (text.starts_with('"') && text.ends_with('"'))
|| (text.starts_with('\'') && text.ends_with('\''))
let (is_exact, text) = if let Some(text) = text
.strip_prefix('"')
.and_then(|t| t.strip_suffix('"'))
.or_else(|| text.strip_prefix('\'').and_then(|t| t.strip_suffix('\'')))
{
(true, text.to_string())
} else {
(false, text)
};
if !matches!(language, Language::None) && is_exact {
FtsFilter::Exact {
field,
text,
text: text.to_string(),
language,
}
} else {

View file

@ -30,13 +30,16 @@ pub mod query;
pub mod write;
pub use ahash;
use backend::{fs::FsStore, s3::S3Store};
use backend::fs::FsStore;
pub use blake3;
pub use parking_lot;
pub use rand;
pub use roaring;
use write::{BitmapClass, BlobOp, ValueClass};
#[cfg(feature = "s3")]
use backend::s3::S3Store;
#[cfg(feature = "postgres")]
use backend::postgres::PostgresStore;
@ -52,6 +55,9 @@ use backend::foundationdb::FdbStore;
#[cfg(feature = "rocks")]
use backend::rocksdb::RocksDbStore;
#[cfg(feature = "elastic")]
use backend::elastic::ElasticSearchStore;
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> crate::Result<Self>;
}
@ -197,6 +203,7 @@ pub enum Store {
#[derive(Clone)]
pub enum BlobStore {
Fs(Arc<FsStore>),
#[cfg(feature = "s3")]
S3(Arc<S3Store>),
#[cfg(feature = "sqlite")]
Sqlite(Arc<SqliteStore>),
@ -213,6 +220,8 @@ pub enum BlobStore {
#[derive(Clone)]
pub enum FtsStore {
Store(Store),
#[cfg(feature = "elastic")]
ElasticSearch(Arc<ElasticSearchStore>),
}
#[cfg(feature = "sqlite")]
@ -256,12 +265,20 @@ impl From<FsStore> for BlobStore {
}
}
#[cfg(feature = "s3")]
impl From<S3Store> for BlobStore {
fn from(store: S3Store) -> Self {
Self::S3(Arc::new(store))
}
}
#[cfg(feature = "elastic")]
impl From<ElasticSearchStore> for FtsStore {
fn from(store: ElasticSearchStore) -> Self {
Self::ElasticSearch(Arc::new(store))
}
}
impl From<Store> for FtsStore {
fn from(store: Store) -> Self {
Self::Store(store)

View file

@ -5,13 +5,15 @@ edition = "2021"
resolver = "2"
[features]
default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks"]
#default = []
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
default = ["rocks", "elastic"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
mysql = ["store/mysql"]
rocks = ["store/rocks"]
elastic = ["store/elastic"]
s3 = ["store/s3"]
[dev-dependencies]
store = { path = "../crates/store", features = ["test_mode"] }

View file

@ -147,6 +147,12 @@ database = "stalwart"
user = "root"
password = "password"
[store.fts]
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
allow-invalid-certs = true
[store.blob]
type = "local"

View file

@ -145,6 +145,12 @@ database = "stalwart"
user = "root"
password = "password"
[store.fts]
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
allow-invalid-certs = true
[store.blob]
type = "local"

View file

@ -29,7 +29,7 @@ use std::io::Read;
use ::store::Store;
use store::backend::{rocksdb::RocksDbStore, sqlite::SqliteStore};
use store::backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore};
use utils::config::Config;
pub struct TempDir {
@ -52,25 +52,33 @@ database = "stalwart"
user = "root"
password = "password"
[store.fts]
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
allow-invalid-certs = true
"#;
#[tokio::test]
pub async fn store_tests() {
let insert = true;
//let insert = true;
let insert = false;
let temp_dir = TempDir::new("store_tests", insert);
let config_file = CONFIG.replace("{TMP}", &temp_dir.path.to_string_lossy());
let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap())
//let db: Store = FdbStore::open(&Config::new(&config_file).unwrap())
//let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap())
//let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap())
//let db: Store = RocksDbStore::open(&Config::new(&config_file).unwrap())
.await
.unwrap()
.into();
let config = Config::new(&config_file).unwrap();
//let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap())
//let db: Store = FdbStore::open(&Config::new(&config_file).unwrap())
//let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap())
//let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap())
let db: Store = RocksDbStore::open(&config).await.unwrap().into();
//let fts_store = FtsStore::from(db.clone());
let fts_store = ElasticSearchStore::open(&config).await.unwrap().into();
if insert {
db.destroy().await;
}
query::test(db.clone(), insert).await;
query::test(db.clone(), fts_store, insert).await;
assign_id::test(db).await;
if insert {
temp_dir.delete();

View file

@ -110,7 +110,7 @@ impl From<FieldId> for u8 {
}
impl Display for FieldId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", FIELDS[self.0 as usize], self.0)
write!(f, "{}", FIELDS[self.0 as usize])
}
}
@ -125,9 +125,8 @@ impl FieldId {
}
#[allow(clippy::mutex_atomic)]
pub async fn test(db: Store, do_insert: bool) {
pub async fn test(db: Store, fts_store: FtsStore, do_insert: bool) {
println!("Running Store query tests...");
let fts_store = FtsStore::from(db.clone());
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(8)