diff --git a/Cargo.lock b/Cargo.lock index c10eb207..8e1fa843 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -249,6 +249,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-compression" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc2d0cfb2a7388d34f590e76686704c494ed7aaceed62ee1ba35cbf363abc2a5" +dependencies = [ + "flate2", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-recursion" version = "1.0.5" @@ -416,6 +429,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" +[[package]] +name = "base64" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" + [[package]] name = "base64" version = "0.13.1" @@ -1128,9 +1147,9 @@ dependencies = [ [[package]] name = "crypto-mac" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bff07008ec701e8028e2ceb8f83f0e4274ee62bd2dbdc4fefff2e9a91824081a" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" dependencies = [ "generic-array", "subtle", @@ -1178,7 +1197,7 @@ dependencies = [ "digest 0.10.7", "fiat-crypto", "platforms", - "rustc_version", + "rustc_version 0.4.0", "subtle", "zeroize", ] @@ -1194,14 +1213,38 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "darling" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a01d95850c592940db9b8194bc39f4bc0e89dee5c4265e4b1807c34a9aba453c" +dependencies = [ + "darling_core 0.13.4", + "darling_macro 0.13.4", +] + [[package]] name = "darling" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.3", + "darling_macro 0.20.3", +] + +[[package]] +name = "darling_core" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859d65a907b6852c9361e3185c862aae7fafd2887876799fa55f5f99dc40d610" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 1.0.109", ] [[package]] @@ -1218,13 +1261,24 @@ dependencies = [ "syn 2.0.39", ] +[[package]] +name = "darling_macro" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c972679f83bdf9c42bd905396b6c3588a843a17f0f16dfcfa3e2c5d57441835" +dependencies = [ + "darling_core 0.13.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ - "darling_core", + "darling_core 0.20.3", "quote", "syn 2.0.39", ] @@ -1323,9 +1377,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f32d04922c60427da6f9fef14d042d9edddef64cb9d4ce0d64d0685fbeb1fd3" +checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" dependencies = [ "powerfmt", "serde", @@ -1595,6 +1649,26 @@ dependencies = [ "serde", ] +[[package]] +name = "elasticsearch" +version = "8.5.0-alpha.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40d9bd57d914cc66ce878f098f63ed7b5d5b64c30644a5adb950b008f874a6c6" +dependencies = [ + "base64 0.11.0", + "bytes", + "dyn-clone", + "lazy_static", + "percent-encoding", + "reqwest", + "rustc_version 0.2.3", + "serde", + "serde_json", + "serde_with", + "url", + "void", +] + [[package]] name = "elliptic-curve" version = "0.13.8" @@ -2554,7 +2628,7 @@ dependencies = [ "parking_lot", "rand", "rustls 0.21.9", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.0.0", "store", "tokio", "tokio-rustls", @@ -3160,7 +3234,7 @@ dependencies = [ "md5", "parking_lot", "rustls 0.21.9", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.0.0", "sieve-rs", "store", "tokio", @@ -3313,7 +3387,7 @@ version = "0.30.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56b0d8a0db9bf6d2213e11f2c701cb91387b0614361625ab7b9743b41aa4938f" dependencies = [ - "darling", + "darling 0.20.3", "heck", "num-bigint", "proc-macro-crate 1.3.1", @@ -4469,6 +4543,7 @@ version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ + "async-compression", "base64 0.21.5", "bytes", "encoding_rs", @@ -4632,9 +4707,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.5" +version = "0.9.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af6c4b23d99685a1408194da11270ef8e9809aff951cc70ec9b17350b087e474" +checksum = "5d0e5124fcb30e76a7e79bfee683a2746db83784b86289f6251b54b7950a0dfc" dependencies = [ "const-oid", "digest 0.10.7", @@ -4754,13 +4829,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver 0.9.0", +] + [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver", + "semver 1.0.20", ] [[package]] @@ -4972,12 +5056,27 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + [[package]] name = "semver" version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836fa6a3e1e547f9a2c4040802ec865b5d85f4014efe00555d7090a3dcaa1090" +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + [[package]] name = "sequoia-openpgp" version = "1.17.0" @@ -5084,6 +5183,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling 0.13.4", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "serial_test" version = "2.0.0" @@ -5286,7 +5407,7 @@ dependencies = [ "regex", "reqwest", "rustls 0.21.9", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.0.0", "serde", "serde_json", "sha1", @@ -5299,7 +5420,7 @@ dependencies = [ "tracing", "unicode-security", "utils", - "webpki-roots 0.25.3", + "webpki-roots 0.26.0", "whatlang", "x509-parser", ] @@ -5641,6 +5762,7 @@ dependencies = [ "ahash 0.8.6", "blake3", "deadpool-postgres", + "elasticsearch", "farmhash", "foundationdb", "futures", @@ -5661,6 +5783,7 @@ dependencies = [ "rust-s3", "rustls 0.21.9", "serde", + "serde_json", "tokio", "tokio-postgres", "tokio-rustls", @@ -5711,9 +5834,9 @@ dependencies = [ [[package]] name = "subtle" -version = "2.4.1" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "syn" @@ -6537,6 +6660,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "void" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" + [[package]] name = "want" version = "0.3.1" diff --git a/crates/imap/Cargo.toml b/crates/imap/Cargo.toml index d8fa824c..a14b6d5a 100644 --- a/crates/imap/Cargo.toml +++ b/crates/imap/Cargo.toml @@ -15,7 +15,7 @@ utils = { path = "../utils" } mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "ludicrous_mode"] } mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] } rustls = "0.21.0" -rustls-pemfile = "1.0" +rustls-pemfile = "2.0" tokio = { version = "1.23", features = ["full"] } tokio-rustls = { version = "0.24.0"} parking_lot = "0.12" diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index 89684284..459275d7 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -48,7 +48,7 @@ use services::{ }; use smtp::core::SMTP; use store::{ - backend::{rocksdb::RocksDbStore, sqlite::SqliteStore}, + backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore}, fts::FtsFilter, parking_lot::Mutex, query::{sort::Pagination, Comparator, Filter, ResultSet, SortedResultSet}, @@ -201,11 +201,11 @@ impl JMAP { .await .failed("Unable to open database"), ));*/ - let store = Store::SQLite(Arc::new( + /*let store = Store::SQLite(Arc::new( SqliteStore::open(config) .await .failed("Unable to open database"), - )); + ));*/ /*let store = Store::FoundationDb(Arc::new( FdbStore::open(config) .await @@ -216,17 +216,22 @@ impl JMAP { .await .failed("Unable to open database"), ));*/ - /*let store = Store::RocksDb(Arc::new( + let store = Store::RocksDb(Arc::new( RocksDbStore::open(config) .await .failed("Unable to open database"), - ));*/ + )); let blob_store = store.clone().into(); /*let blob_store = BlobStore::Fs(Arc::new( FsStore::open(config) .await .failed("Unable to open blob store"), ));*/ + //let fts_store = FtsStore::Store(store.clone()); + let fts_store = ElasticSearchStore::open(config) + .await + .failed("Unable to open FTS store") + .into(); let jmap_server = Arc::new(JMAP { directory: directory_config @@ -241,7 +246,7 @@ impl JMAP { .property::("global.node-id")? .map(SnowflakeIdGenerator::with_node_id) .unwrap_or_else(SnowflakeIdGenerator::new), - fts_store: FtsStore::Store(store.clone()), + fts_store, store, blob_store, config: Config::new(config).failed("Invalid configuration file"), diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index 4aaba837..f838c19c 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -31,10 +31,12 @@ tracing = "0.1" jemallocator = "0.5.0" [features] -default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks"] -#default = [] +#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"] +default = ["rocks", "elastic"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation"] postgres = ["store/postgres"] mysql = ["store/mysql"] rocks = ["store/rocks"] +elastic = ["store/elastic"] +s3 = ["store/s3"] diff --git a/crates/managesieve/Cargo.toml b/crates/managesieve/Cargo.toml index ef5e04bc..df918b6d 100644 --- a/crates/managesieve/Cargo.toml +++ b/crates/managesieve/Cargo.toml @@ -16,7 +16,7 @@ mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] } sieve-rs = { git = "https://github.com/stalwartlabs/sieve" } rustls = "0.21.0" -rustls-pemfile = "1.0" +rustls-pemfile = "2.0" tokio = { version = "1.23", features = ["full"] } tokio-rustls = { version = "0.24.0"} parking_lot = "0.12" diff --git a/crates/smtp/Cargo.toml b/crates/smtp/Cargo.toml index b0c543d1..cbb320a2 100644 --- a/crates/smtp/Cargo.toml +++ b/crates/smtp/Cargo.toml @@ -23,10 +23,10 @@ smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" } sieve-rs = { git = "https://github.com/stalwartlabs/sieve" } ahash = { version = "0.8" } rustls = "0.21.0" -rustls-pemfile = "1.0" +rustls-pemfile = "2.0" tokio = { version = "1.23", features = ["full"] } tokio-rustls = { version = "0.24.0"} -webpki-roots = { version = "0.25"} +webpki-roots = { version = "0.26"} hyper = { version = "1.0.1", features = ["server", "http1", "http2"] } hyper-util = { version = "0.1.1", features = ["tokio"] } http-body-util = "0.1.0" diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 5c7c4e21..c113f33b 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -10,7 +10,7 @@ nlp = { path = "../nlp" } rocksdb = { version = "0.21", optional = true, features = ["multi-threaded-cf"] } foundationdb = { version = "0.8.0", features = ["embedded-fdb-include"], optional = true } rusqlite = { version = "0.29.0", features = ["bundled"], optional = true } -rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] } +rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"], optional = true } tokio = { version = "1.23", features = ["sync", "fs", "io-util"] } r2d2 = { version = "0.8.10", optional = true } futures = { version = "0.3", optional = true } @@ -34,6 +34,8 @@ tokio-rustls = { version = "0.24.0", optional = true } rustls = { version = "0.21.0", optional = true } ring = { version = "0.17", optional = true } mysql_async = { version = "0.33", default-features = false, features = ["default-rustls"], optional = true } +elasticsearch = { version = "8.5.0-alpha.1", default-features = false, features = ["rustls-tls"], optional = true } +serde_json = {version = "1.0.64", optional = true } [dev-dependencies] tokio = { version = "1.23", features = ["full"] } @@ -42,7 +44,9 @@ tokio = { version = "1.23", features = ["full"] } rocks = ["rocksdb", "rayon", "num_cpus"] sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "lru-cache"] postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "futures"] +elastic = ["elasticsearch", "serde_json"] mysql = ["mysql_async"] +s3 = ["rust-s3"] foundation = ["foundationdb", "futures"] test_mode = [] diff --git a/crates/store/src/backend/elastic/index.rs b/crates/store/src/backend/elastic/index.rs new file mode 100644 index 00000000..ab9813a9 --- /dev/null +++ b/crates/store/src/backend/elastic/index.rs @@ -0,0 +1,140 @@ +use std::{borrow::Cow, fmt::Display}; + +use elasticsearch::{DeleteByQueryParts, IndexParts}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +use crate::{ + backend::elastic::INDEX_NAMES, + fts::{index::FtsDocument, Field}, +}; + +use super::ElasticSearchStore; + +#[derive(Serialize, Deserialize, Default)] +struct Document<'x> { + document_id: u32, + account_id: u32, + body: Vec>, + attachments: Vec>, + keywords: Vec>, + header: Vec>, +} + +#[derive(Serialize, Deserialize)] +struct Header<'x> { + name: Cow<'x, str>, + value: Cow<'x, str>, +} + +impl ElasticSearchStore { + pub async fn fts_index + Display + Clone + std::fmt::Debug>( + &self, + document: FtsDocument<'_, T>, + ) -> crate::Result<()> { + self.index + .index(IndexParts::Index(INDEX_NAMES[document.collection as usize])) + .body(Document::from(document)) + .send() + .await + .map_err(Into::into) + .and_then(|response| { + if response.status_code().is_success() { + Ok(()) + } else { + Err(crate::Error::InternalError(format!( + "Failed to index document: {:?}", + response + ))) + } + }) + } + + pub async fn fts_remove( + &self, + account_id: u32, + collection: u8, + document_id: u32, + ) -> crate::Result { + self.index + .delete_by_query(DeleteByQueryParts::Index(&[ + INDEX_NAMES[collection as usize] + ])) + .body(json!({ + "query": { + "bool": { + "must": [ + { "match": { "account_id": account_id } }, + { "match": { "document_id": document_id } } + ] + } + } + })) + .send() + .await + .map_err(Into::into) + .and_then(|response| { + if response.status_code().is_success() { + Ok(true) + } else { + Err(crate::Error::InternalError(format!( + "Failed to remove document: {:?}", + response + ))) + } + }) + } + + pub async fn fts_remove_all(&self, account_id: u32) -> crate::Result<()> { + self.index + .delete_by_query(DeleteByQueryParts::Index(INDEX_NAMES)) + .body(json!({ + "query": { + "bool": { + "must": [ + { "match": { "account_id": account_id } }, + ] + } + } + })) + .send() + .await + .map_err(Into::into) + .and_then(|response| { + if response.status_code().is_success() { + Ok(()) + } else { + Err(crate::Error::InternalError(format!( + "Failed to remove document: {:?}", + response + ))) + } + }) + } +} + +impl<'x, T: Into + Display + Clone + std::fmt::Debug> From> + for Document<'x> +{ + fn from(value: FtsDocument<'x, T>) -> Self { + let mut document = Document { + account_id: value.account_id, + document_id: value.document_id, + ..Default::default() + }; + + for part in value.parts { + match part.field { + Field::Header(name) => document.header.push(Header { + name: name.to_string().into(), + value: part.text, + }), + Field::Body => document.body.push(part.text), + Field::Attachment => document.attachments.push(part.text), + Field::Keyword => document.keywords.push(part.text), + } + } + + document + } +} diff --git a/crates/store/src/backend/elastic/mod.rs b/crates/store/src/backend/elastic/mod.rs new file mode 100644 index 00000000..a8bbedf8 --- /dev/null +++ b/crates/store/src/backend/elastic/mod.rs @@ -0,0 +1,160 @@ +use elasticsearch::{ + auth::Credentials, + cert::CertificateValidation, + http::{ + transport::{BuildError, SingleNodeConnectionPool, Transport, TransportBuilder}, + StatusCode, Url, + }, + indices::{IndicesCreateParts, IndicesExistsParts}, + Elasticsearch, Error, +}; +use serde_json::json; +use utils::config::Config; + +pub mod index; +pub mod query; + +pub struct ElasticSearchStore { + index: Elasticsearch, +} + +pub(crate) static INDEX_NAMES: &[&str] = &["stalwart_email"]; + +impl ElasticSearchStore { + pub async fn open(config: &Config) -> crate::Result { + let credentials = if let Some(user) = config.value("store.fts.user") { + let password = config.value_require("store.fts.password")?; + Some(Credentials::Basic(user.to_string(), password.to_string())) + } else { + None + }; + + let es = if let Some(url) = config.value("store.fts.url") { + let url = Url::parse(url).map_err(|e| { + crate::Error::InternalError(format!("Invalid store.fts.url: {}", e)) + })?; + let conn_pool = SingleNodeConnectionPool::new(url); + let mut builder = TransportBuilder::new(conn_pool); + if let Some(credentials) = credentials { + builder = builder.auth(credentials); + } + if config.property_or_static::("store.fts.allow-invalid-certs", "false")? { + builder = builder.cert_validation(CertificateValidation::None); + } + + Self { + index: Elasticsearch::new(builder.build()?), + } + } else if let Some(cloud_id) = config.value("store.fts.cloud-id") { + Self { + index: Elasticsearch::new(Transport::cloud( + cloud_id, + credentials.ok_or_else(|| { + crate::Error::InternalError( + "Missing store.fts.user or store.fts.password".to_string(), + ) + })?, + )?), + } + } else { + return Err(crate::Error::InternalError( + "Missing store.fts.url or store.fts.cloud_id".to_string(), + )); + }; + + es.create_index( + config.property_or_static("store.fts.shards", "3")?, + config.property_or_static("store.fts.replicas", "0")?, + ) + .await?; + + Ok(es) + } + + async fn create_index(&self, shards: usize, replicas: usize) -> crate::Result<()> { + let exists = self + .index + .indices() + .exists(IndicesExistsParts::Index(&[INDEX_NAMES[0]])) + .send() + .await?; + + if exists.status_code() == StatusCode::NOT_FOUND { + let response = self + .index + .indices() + .create(IndicesCreateParts::Index(INDEX_NAMES[0])) + .body(json!({ + "mappings": { + "properties": { + "document_id": { + "type": "integer" + }, + "account_id": { + "type": "integer" + }, + "header": { + "type": "object", + "properties": { + "name": { + "type": "keyword" + }, + "value": { + "type": "text", + "analyzer": "default_analyzer", + } + } + }, + "body": { + "analyzer": "default_analyzer", + "type": "text" + }, + "attachment": { + "analyzer": "default_analyzer", + "type": "text" + }, + "keyword": { + "type": "keyword" + } + } + }, + "settings": { + "index.number_of_shards": shards, + "index.number_of_replicas": replicas, + "analysis": { + "analyzer": { + "default_analyzer": { + "type": "custom", + "tokenizer": "standard", + "filter": ["lowercase"] + } + } + } + } + })) + .send() + .await?; + + if !response.status_code().is_success() { + return Err(crate::Error::InternalError(format!( + "Error while creating ElastiSearch index: {:?}", + response + ))); + } + } + + Ok(()) + } +} + +impl From for crate::Error { + fn from(value: Error) -> Self { + crate::Error::InternalError(format!("Elasticsearch error: {}", value)) + } +} + +impl From for crate::Error { + fn from(value: BuildError) -> Self { + crate::Error::InternalError(format!("Elasticsearch build error: {}", value)) + } +} diff --git a/crates/store/src/backend/elastic/query.rs b/crates/store/src/backend/elastic/query.rs new file mode 100644 index 00000000..79650cd4 --- /dev/null +++ b/crates/store/src/backend/elastic/query.rs @@ -0,0 +1,124 @@ +use std::{borrow::Cow, fmt::Display}; + +use elasticsearch::SearchParts; +use roaring::RoaringBitmap; +use serde_json::{json, Value}; + +use crate::fts::{Field, FtsFilter}; + +use super::{ElasticSearchStore, INDEX_NAMES}; + +impl ElasticSearchStore { + pub async fn fts_query + Display + Clone + std::fmt::Debug>( + &self, + account_id: u32, + collection: impl Into, + filters: Vec>, + ) -> crate::Result { + let mut stack: Vec<(FtsFilter, Vec)> = vec![]; + let mut conditions = vec![json!({ "match": { "account_id": account_id } })]; + let mut logical_op = FtsFilter::And; + + for filter in filters { + let is_exact = matches!(filter, FtsFilter::Exact { .. }); + match filter { + FtsFilter::Exact { field, text, .. } + | FtsFilter::Contains { field, text, .. } + | FtsFilter::Keyword { field, text, .. } => { + let match_type = if is_exact { "term" } else { "match" }; + + if let Field::Header(name) = field { + conditions.push(json!({"bool": { + "must": [ + { + "term": { + "header.name": name.to_string() + } + }, + { + match_type: { + "header.value": text + } + } + ] + }})); + } else { + conditions.push(json!({ + match_type: { field.name(): text } + })); + } + } + FtsFilter::And | FtsFilter::Or | FtsFilter::Not => { + stack.push((logical_op, conditions)); + logical_op = filter; + conditions = Vec::new(); + } + FtsFilter::End => { + if let Some((prev_logical_op, mut prev_conditions)) = stack.pop() { + if !conditions.is_empty() { + match logical_op { + FtsFilter::And => { + prev_conditions.push(json!({ "bool": { "must": conditions } })); + } + FtsFilter::Or => { + prev_conditions + .push(json!({ "bool": { "should": conditions } })); + } + FtsFilter::Not => { + prev_conditions + .push(json!({ "bool": { "must_not": conditions } })); + } + _ => unreachable!(), + } + } + logical_op = prev_logical_op; + conditions = prev_conditions; + } + } + } + } + + // TODO implement pagination + let response = self + .index + .search(SearchParts::Index(&[ + INDEX_NAMES[collection.into() as usize] + ])) + .body(json!({ + "query": { + "bool": { + "must": conditions, + } + }, + "size": 10000, + "_source": ["document_id"] + })) + .send() + .await? + .error_for_status_code()?; + + let json: Value = response.json().await?; + let mut results = RoaringBitmap::new(); + + for hit in json["hits"]["hits"].as_array().ok_or_else(|| { + crate::Error::InternalError("Invalid response from ElasticSearch".to_string()) + })? { + results.insert(hit["_source"]["document_id"].as_u64().ok_or_else(|| { + crate::Error::InternalError("Invalid response from ElasticSearch".to_string()) + })? as u32); + } + + Ok(results) + } +} + +impl + Display + Clone + std::fmt::Debug> Field { + pub fn name(&self) -> Cow<'static, str> { + match self { + Field::Header(name) => format!("header.{name}").into(), + Field::Body => "body".into(), + Field::Attachment => "attachment".into(), + Field::Keyword => "keyword".into(), + } + } +} diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index 5450623d..0ac1d8bd 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -21,20 +21,22 @@ * for more details. */ +#[cfg(feature = "elastic")] +pub mod elastic; #[cfg(feature = "foundation")] pub mod foundationdb; +pub mod fs; #[cfg(feature = "mysql")] pub mod mysql; #[cfg(feature = "postgres")] pub mod postgres; #[cfg(feature = "rocks")] pub mod rocksdb; +#[cfg(feature = "s3")] +pub mod s3; #[cfg(feature = "sqlite")] pub mod sqlite; -pub mod fs; -pub mod s3; - pub const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 1) as usize; pub const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1; diff --git a/crates/store/src/dispatch.rs b/crates/store/src/dispatch.rs index 859f194e..9699b58f 100644 --- a/crates/store/src/dispatch.rs +++ b/crates/store/src/dispatch.rs @@ -354,6 +354,7 @@ impl BlobStore { pub async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { match self { Self::Fs(store) => store.get_blob(key, range).await, + #[cfg(feature = "s3")] Self::S3(store) => store.get_blob(key, range).await, #[cfg(feature = "sqlite")] Self::Sqlite(store) => store.get_blob(key, range).await, @@ -371,6 +372,7 @@ impl BlobStore { pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { match self { Self::Fs(store) => store.put_blob(key, data).await, + #[cfg(feature = "s3")] Self::S3(store) => store.put_blob(key, data).await, #[cfg(feature = "sqlite")] Self::Sqlite(store) => store.put_blob(key, data).await, @@ -388,6 +390,7 @@ impl BlobStore { pub async fn delete_blob(&self, key: &[u8]) -> crate::Result { match self { Self::Fs(store) => store.delete_blob(key).await, + #[cfg(feature = "s3")] Self::S3(store) => store.delete_blob(key).await, #[cfg(feature = "sqlite")] Self::Sqlite(store) => store.delete_blob(key).await, @@ -410,6 +413,8 @@ impl FtsStore { ) -> crate::Result<()> { match self { FtsStore::Store(store) => store.fts_index(document).await, + #[cfg(feature = "elastic")] + FtsStore::ElasticSearch(store) => store.fts_index(document).await, } } @@ -421,6 +426,10 @@ impl FtsStore { ) -> crate::Result { match self { FtsStore::Store(store) => store.fts_query(account_id, collection, filters).await, + #[cfg(feature = "elastic")] + FtsStore::ElasticSearch(store) => { + store.fts_query(account_id, collection, filters).await + } } } @@ -432,12 +441,18 @@ impl FtsStore { ) -> crate::Result { match self { FtsStore::Store(store) => store.fts_remove(account_id, collection, document_id).await, + #[cfg(feature = "elastic")] + FtsStore::ElasticSearch(store) => { + store.fts_remove(account_id, collection, document_id).await + } } } pub async fn remove_all(&self, account_id: u32) -> crate::Result<()> { match self { FtsStore::Store(store) => store.fts_remove_all(account_id).await, + #[cfg(feature = "elastic")] + FtsStore::ElasticSearch(store) => store.fts_remove_all(account_id).await, } } } diff --git a/crates/store/src/fts/index.rs b/crates/store/src/fts/index.rs index 1493fdcd..596b737e 100644 --- a/crates/store/src/fts/index.rs +++ b/crates/store/src/fts/index.rs @@ -45,14 +45,14 @@ use crate::{ use super::Field; #[derive(Debug)] -struct Text<'x, T: Into + Display + Clone + std::fmt::Debug> { - field: Field, - text: Cow<'x, str>, - typ: Type, +pub(crate) struct Text<'x, T: Into + Display + Clone + std::fmt::Debug> { + pub field: Field, + pub text: Cow<'x, str>, + pub typ: Type, } #[derive(Debug)] -enum Type { +pub(crate) enum Type { Text(Language), Tokenize, Keyword, @@ -60,11 +60,11 @@ enum Type { #[derive(Debug)] pub struct FtsDocument<'x, T: Into + Display + Clone + std::fmt::Debug> { - parts: Vec>, - default_language: Language, - account_id: u32, - collection: u8, - document_id: u32, + pub(crate) parts: Vec>, + pub(crate) default_language: Language, + pub(crate) account_id: u32, + pub(crate) collection: u8, + pub(crate) document_id: u32, } impl<'x, T: Into + Display + Clone + std::fmt::Debug> FtsDocument<'x, T> { diff --git a/crates/store/src/fts/mod.rs b/crates/store/src/fts/mod.rs index cb38f1e9..c0cb837d 100644 --- a/crates/store/src/fts/mod.rs +++ b/crates/store/src/fts/mod.rs @@ -28,7 +28,7 @@ use nlp::language::Language; pub mod index; pub mod query; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum Field + Display + Clone + std::fmt::Debug> { Header(T), Body, @@ -36,7 +36,7 @@ pub enum Field + Display + Clone + std::fmt::Debug> { Keyword, } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub enum FtsFilter + Display + Clone + std::fmt::Debug> { Exact { field: Field, @@ -70,12 +70,20 @@ impl + Display + Clone + std::fmt::Debug> FtsFilter { pub fn has_text(field: Field, text: impl Into, language: Language) -> Self { let text = text.into(); - if !matches!(language, Language::None) && (text.starts_with('"') && text.ends_with('"')) - || (text.starts_with('\'') && text.ends_with('\'')) + let (is_exact, text) = if let Some(text) = text + .strip_prefix('"') + .and_then(|t| t.strip_suffix('"')) + .or_else(|| text.strip_prefix('\'').and_then(|t| t.strip_suffix('\''))) { + (true, text.to_string()) + } else { + (false, text) + }; + + if !matches!(language, Language::None) && is_exact { FtsFilter::Exact { field, - text, + text: text.to_string(), language, } } else { diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 5c649861..3d2065cf 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -30,13 +30,16 @@ pub mod query; pub mod write; pub use ahash; -use backend::{fs::FsStore, s3::S3Store}; +use backend::fs::FsStore; pub use blake3; pub use parking_lot; pub use rand; pub use roaring; use write::{BitmapClass, BlobOp, ValueClass}; +#[cfg(feature = "s3")] +use backend::s3::S3Store; + #[cfg(feature = "postgres")] use backend::postgres::PostgresStore; @@ -52,6 +55,9 @@ use backend::foundationdb::FdbStore; #[cfg(feature = "rocks")] use backend::rocksdb::RocksDbStore; +#[cfg(feature = "elastic")] +use backend::elastic::ElasticSearchStore; + pub trait Deserialize: Sized + Sync + Send { fn deserialize(bytes: &[u8]) -> crate::Result; } @@ -197,6 +203,7 @@ pub enum Store { #[derive(Clone)] pub enum BlobStore { Fs(Arc), + #[cfg(feature = "s3")] S3(Arc), #[cfg(feature = "sqlite")] Sqlite(Arc), @@ -213,6 +220,8 @@ pub enum BlobStore { #[derive(Clone)] pub enum FtsStore { Store(Store), + #[cfg(feature = "elastic")] + ElasticSearch(Arc), } #[cfg(feature = "sqlite")] @@ -256,12 +265,20 @@ impl From for BlobStore { } } +#[cfg(feature = "s3")] impl From for BlobStore { fn from(store: S3Store) -> Self { Self::S3(Arc::new(store)) } } +#[cfg(feature = "elastic")] +impl From for FtsStore { + fn from(store: ElasticSearchStore) -> Self { + Self::ElasticSearch(Arc::new(store)) + } +} + impl From for FtsStore { fn from(store: Store) -> Self { Self::Store(store) diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 56fa3d6c..d059480c 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -5,13 +5,15 @@ edition = "2021" resolver = "2" [features] -default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks"] -#default = [] +#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"] +default = ["rocks", "elastic"] sqlite = ["store/sqlite"] foundationdb = ["store/foundation"] postgres = ["store/postgres"] mysql = ["store/mysql"] rocks = ["store/rocks"] +elastic = ["store/elastic"] +s3 = ["store/s3"] [dev-dependencies] store = { path = "../crates/store", features = ["test_mode"] } diff --git a/tests/src/imap/mod.rs b/tests/src/imap/mod.rs index b1b7cc83..8b0b05be 100644 --- a/tests/src/imap/mod.rs +++ b/tests/src/imap/mod.rs @@ -147,6 +147,12 @@ database = "stalwart" user = "root" password = "password" +[store.fts] +url = "https://localhost:9200" +user = "elastic" +password = "RtQ-Lu6+o4rxx=XJplVJ" +allow-invalid-certs = true + [store.blob] type = "local" diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 6b0c0a09..ee9f8f8a 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -145,6 +145,12 @@ database = "stalwart" user = "root" password = "password" +[store.fts] +url = "https://localhost:9200" +user = "elastic" +password = "RtQ-Lu6+o4rxx=XJplVJ" +allow-invalid-certs = true + [store.blob] type = "local" diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index 4899a737..b842cc19 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -29,7 +29,7 @@ use std::io::Read; use ::store::Store; -use store::backend::{rocksdb::RocksDbStore, sqlite::SqliteStore}; +use store::backend::{elastic::ElasticSearchStore, rocksdb::RocksDbStore}; use utils::config::Config; pub struct TempDir { @@ -52,25 +52,33 @@ database = "stalwart" user = "root" password = "password" +[store.fts] +url = "https://localhost:9200" +user = "elastic" +password = "RtQ-Lu6+o4rxx=XJplVJ" +allow-invalid-certs = true + "#; #[tokio::test] pub async fn store_tests() { - let insert = true; + //let insert = true; + let insert = false; let temp_dir = TempDir::new("store_tests", insert); let config_file = CONFIG.replace("{TMP}", &temp_dir.path.to_string_lossy()); - let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap()) - //let db: Store = FdbStore::open(&Config::new(&config_file).unwrap()) - //let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap()) - //let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap()) - //let db: Store = RocksDbStore::open(&Config::new(&config_file).unwrap()) - .await - .unwrap() - .into(); + let config = Config::new(&config_file).unwrap(); + //let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap()) + //let db: Store = FdbStore::open(&Config::new(&config_file).unwrap()) + //let db: Store = PostgresStore::open(&Config::new(&config_file).unwrap()) + //let db: Store = MysqlStore::open(&Config::new(&config_file).unwrap()) + let db: Store = RocksDbStore::open(&config).await.unwrap().into(); + //let fts_store = FtsStore::from(db.clone()); + let fts_store = ElasticSearchStore::open(&config).await.unwrap().into(); + if insert { db.destroy().await; } - query::test(db.clone(), insert).await; + query::test(db.clone(), fts_store, insert).await; assign_id::test(db).await; if insert { temp_dir.delete(); diff --git a/tests/src/store/query.rs b/tests/src/store/query.rs index 37df0f7a..71ef5acf 100644 --- a/tests/src/store/query.rs +++ b/tests/src/store/query.rs @@ -110,7 +110,7 @@ impl From for u8 { } impl Display for FieldId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{} ({})", FIELDS[self.0 as usize], self.0) + write!(f, "{}", FIELDS[self.0 as usize]) } } @@ -125,9 +125,8 @@ impl FieldId { } #[allow(clippy::mutex_atomic)] -pub async fn test(db: Store, do_insert: bool) { +pub async fn test(db: Store, fts_store: FtsStore, do_insert: bool) { println!("Running Store query tests..."); - let fts_store = FtsStore::from(db.clone()); let pool = rayon::ThreadPoolBuilder::new() .num_threads(8)