Redis lookup backend implementation

This commit is contained in:
mdecimus 2023-12-12 18:45:52 +01:00
parent 78afc703f5
commit b7869901ee
15 changed files with 586 additions and 6 deletions

78
Cargo.lock generated
View file

@ -938,6 +938,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "combine"
version = "4.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "console"
version = "0.15.7"
@ -1000,6 +1014,12 @@ dependencies = [
"libc",
]
[[package]]
name = "crc16"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "crc32fast"
version = "1.3.2"
@ -4347,6 +4367,37 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "redis"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd"
dependencies = [
"async-trait",
"bytes",
"combine",
"crc16",
"futures",
"futures-util",
"itoa",
"log",
"percent-encoding",
"pin-project-lite",
"rand",
"rustls 0.21.10",
"rustls-native-certs",
"rustls-pemfile 1.0.4",
"rustls-webpki 0.101.7",
"ryu",
"sha1_smol",
"socket2 0.4.10",
"tokio",
"tokio-rustls 0.24.1",
"tokio-util",
"url",
"webpki-roots 0.23.1",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -4833,6 +4884,16 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7673e0aa20ee4937c6aacfc12bb8341cfbf054cdd21df6bec5fd0629fe9339b"
[[package]]
name = "rustls-webpki"
version = "0.100.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f6a5fc258f1c1276dfe3016516945546e2d5383911efc0fc4f1cdc5df3a4ae3"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
name = "rustls-webpki"
version = "0.101.7"
@ -5168,6 +5229,12 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "sha1collisiondetection"
version = "0.3.2"
@ -5467,6 +5534,7 @@ dependencies = [
"async-trait",
"blake3",
"bytes",
"deadpool",
"deadpool-postgres",
"elasticsearch",
"farmhash",
@ -5483,6 +5551,7 @@ dependencies = [
"r2d2",
"rand",
"rayon",
"redis",
"regex",
"reqwest",
"ring 0.17.7",
@ -6509,6 +6578,15 @@ dependencies = [
"webpki",
]
[[package]]
name = "webpki-roots"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338"
dependencies = [
"rustls-webpki 0.100.3",
]
[[package]]
name = "webpki-roots"
version = "0.25.3"

View file

@ -31,8 +31,8 @@ tracing = "0.1"
jemallocator = "0.5.0"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
default = ["sqlite", "postgres", "mysql", "foundationdb", "rocks"]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "redis"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
@ -40,3 +40,4 @@ mysql = ["store/mysql"]
rocks = ["store/rocks"]
elastic = ["store/elastic"]
s3 = ["store/s3"]
redis = ["store/redis"]

View file

@ -42,6 +42,8 @@ regex = "1.7.0"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls-webpki-roots", "blocking"] }
flate2 = "1.0"
async-trait = "0.1.68"
redis = { version = "0.24.0", features = [ "tokio-comp", "tokio-rustls-comp", "tls-rustls-insecure", "tls-rustls-webpki-roots", "cluster-async"], optional = true }
deadpool = { version = "0.10.0", features = ["managed"], optional = true }
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }
@ -55,6 +57,7 @@ mysql = ["mysql_async"]
s3 = ["rust-s3"]
foundation = ["foundationdb", "futures"]
fdb-chunked-bm = []
redis = ["dep:redis", "deadpool"]
test_mode = []

View file

@ -31,6 +31,8 @@ pub mod memory;
pub mod mysql;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "redis")]
pub mod redis;
#[cfg(feature = "rocks")]
pub mod rocksdb;
#[cfg(feature = "s3")]

View file

@ -0,0 +1,90 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use redis::AsyncCommands;
use crate::{Deserialize, LookupKey, LookupValue};
use super::{RedisPool, RedisStore};
impl RedisStore {
pub async fn key_set(&self, key: Vec<u8>, value: LookupValue<Vec<u8>>) -> crate::Result<()> {
match &self.pool {
RedisPool::Single(pool) => self.key_set_(pool.get().await?.as_mut(), key, value).await,
RedisPool::Cluster(pool) => self.key_set_(pool.get().await?.as_mut(), key, value).await,
}
}
pub async fn key_get<T: Deserialize + std::fmt::Debug + 'static>(
&self,
key: LookupKey,
) -> crate::Result<LookupValue<T>> {
match &self.pool {
RedisPool::Single(pool) => self.key_get_(pool.get().await?.as_mut(), key).await,
RedisPool::Cluster(pool) => self.key_get_(pool.get().await?.as_mut(), key).await,
}
}
async fn key_get_<T: Deserialize + std::fmt::Debug + 'static>(
&self,
conn: &mut impl AsyncCommands,
key: LookupKey,
) -> crate::Result<LookupValue<T>> {
match key {
LookupKey::Key(key) => {
if let Some(value) = conn.get::<_, Option<Vec<u8>>>(key).await? {
T::deserialize(&value).map(|value| LookupValue::Value { value, expires: 0 })
} else {
Ok(LookupValue::None)
}
}
LookupKey::Counter(key) => {
let value: Option<i64> = conn.get(key).await?;
Ok(LookupValue::Counter {
num: value.unwrap_or(0),
})
}
}
}
async fn key_set_(
&self,
conn: &mut impl AsyncCommands,
key: Vec<u8>,
value: LookupValue<Vec<u8>>,
) -> crate::Result<()> {
match value {
LookupValue::Value { value, expires } => {
if expires > 0 {
conn.set_ex(key, value, expires).await?;
} else {
conn.set(key, value).await?;
}
}
LookupValue::Counter { num } => conn.incr(key, num).await?,
LookupValue::None => (),
}
Ok(())
}
}

View file

@ -0,0 +1,159 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::time::Duration;
use deadpool::{
managed::{Manager, Pool, PoolError},
Runtime,
};
use redis::{
cluster::{ClusterClient, ClusterClientBuilder},
Client, RedisError,
};
use utils::config::{utils::AsKey, Config};
pub mod lookup;
pub mod pool;
pub struct RedisStore {
pool: RedisPool,
}
struct RedisConnectionManager {
client: Client,
timeout: Duration,
}
struct RedisClusterConnectionManager {
client: ClusterClient,
timeout: Duration,
}
enum RedisPool {
Single(Pool<RedisConnectionManager>),
Cluster(Pool<RedisClusterConnectionManager>),
}
impl RedisStore {
pub async fn open(config: &Config, prefix: impl AsKey) -> crate::Result<Self> {
let prefix = prefix.as_key();
let db = if let Some(url) = config.value((&prefix, "url")) {
Self {
pool: RedisPool::Single(build_pool(
config,
&prefix,
RedisConnectionManager {
client: Client::open(url)?,
timeout: config.property_or_static((&prefix, "timeout"), "10s")?,
},
)?),
}
} else {
let addresses = config
.values((&prefix, "urls"))
.map(|(_, v)| v.to_string())
.collect::<Vec<_>>();
if addresses.is_empty() {
return Err(crate::Error::InternalError(format!(
"No Redis cluster URLs specified for {prefix:?}"
)));
}
let mut builder = ClusterClientBuilder::new(addresses.into_iter());
if let Some(value) = config.property((&prefix, "username"))? {
builder = builder.username(value);
}
if let Some(value) = config.property((&prefix, "password"))? {
builder = builder.password(value);
}
if let Some(value) = config.property((&prefix, "retries"))? {
builder = builder.retries(value);
}
if let Some(value) = config.property::<Duration>((&prefix, "max-retry-wait"))? {
builder = builder.max_retry_wait(value.as_secs());
}
if let Some(value) = config.property::<Duration>((&prefix, "min-retry-wait"))? {
builder = builder.min_retry_wait(value.as_secs());
}
if let Some(true) = config.property::<bool>((&prefix, "read-from-replicas"))? {
builder = builder.read_from_replicas();
}
Self {
pool: RedisPool::Cluster(build_pool(
config,
&prefix,
RedisClusterConnectionManager {
client: builder.build()?,
timeout: config.property_or_static((&prefix, "timeout"), "10s")?,
},
)?),
}
};
Ok(db)
}
}
fn build_pool<M: Manager>(
config: &Config,
prefix: &str,
manager: M,
) -> utils::config::Result<Pool<M>> {
Pool::builder(manager)
.runtime(Runtime::Tokio1)
.max_size(config.property_or_static((prefix, "pool.max-connections"), "10")?)
.create_timeout(
config
.property_or_static::<Duration>((prefix, "pool.create-timeout"), "30s")?
.into(),
)
.wait_timeout(config.property_or_static((prefix, "pool.wait-timeout"), "30s")?)
.recycle_timeout(config.property_or_static((prefix, "pool.recycle-timeout"), "30s")?)
.build()
.map_err(|err| {
format!(
"Failed to build pool for {prefix:?}: {err}",
prefix = prefix,
err = err
)
})
}
impl From<PoolError<RedisError>> for crate::Error {
fn from(value: PoolError<RedisError>) -> Self {
crate::Error::InternalError(format!("Redis pool error: {}", value))
}
}
impl From<PoolError<crate::Error>> for crate::Error {
fn from(value: PoolError<crate::Error>) -> Self {
crate::Error::InternalError(format!("Connection pool {}", value))
}
}
impl From<RedisError> for crate::Error {
fn from(value: RedisError) -> Self {
crate::Error::InternalError(format!("Redis error: {}", value))
}
}

View file

@ -0,0 +1,83 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use async_trait::async_trait;
use deadpool::managed;
use redis::{
aio::{Connection, ConnectionLike},
cluster_async::ClusterConnection,
};
use super::{RedisClusterConnectionManager, RedisConnectionManager};
#[async_trait]
impl managed::Manager for RedisConnectionManager {
type Type = Connection;
type Error = crate::Error;
async fn create(&self) -> Result<Connection, crate::Error> {
match tokio::time::timeout(self.timeout, self.client.get_tokio_connection()).await {
Ok(conn) => conn.map_err(Into::into),
Err(_) => Err(crate::Error::InternalError(
"Redis connection timeout".into(),
)),
}
}
async fn recycle(
&self,
conn: &mut Connection,
_: &managed::Metrics,
) -> managed::RecycleResult<crate::Error> {
conn.req_packed_command(&redis::cmd("PING"))
.await
.map(|_| ())
.map_err(|err| managed::RecycleError::Backend(err.into()))
}
}
#[async_trait]
impl managed::Manager for RedisClusterConnectionManager {
type Type = ClusterConnection;
type Error = crate::Error;
async fn create(&self) -> Result<ClusterConnection, crate::Error> {
match tokio::time::timeout(self.timeout, self.client.get_async_connection()).await {
Ok(conn) => conn.map_err(Into::into),
Err(_) => Err(crate::Error::InternalError(
"Redis connection timeout".into(),
)),
}
}
async fn recycle(
&self,
conn: &mut ClusterConnection,
_: &managed::Metrics,
) -> managed::RecycleResult<crate::Error> {
conn.req_packed_command(&redis::cmd("PING"))
.await
.map(|_| ())
.map_err(|err| managed::RecycleError::Backend(err.into()))
}
}

View file

@ -53,6 +53,9 @@ use crate::backend::rocksdb::RocksDbStore;
#[cfg(feature = "elastic")]
use crate::backend::elastic::ElasticSearchStore;
#[cfg(feature = "redis")]
use crate::backend::redis::RedisStore;
#[async_trait]
pub trait ConfigStore {
async fn parse_stores(&self) -> utils::config::Result<Stores>;
@ -161,6 +164,13 @@ impl ConfigStore for Config {
);
continue;
}
#[cfg(feature = "redis")]
"redis" => {
config
.lookup_stores
.insert(store_id, RedisStore::open(self, prefix).await?.into());
continue;
}
"memory" => {
let prefix = prefix.as_key();
for lookup_id in self.sub_keys((&prefix, "lookup")) {

View file

@ -49,6 +49,10 @@ impl LookupStore {
)),
},
LookupStore::Memory(store) => store.query(query, params),
#[cfg(feature = "redis")]
LookupStore::Redis(_) => Err(crate::Error::InternalError(
"Redis does not support queries".into(),
)),
};
tracing::trace!( context = "store", event = "query", query = query, result = ?result);
@ -81,6 +85,8 @@ impl LookupStore {
batch.ops.push(Operation::Value { class, op });
store.write(batch.build()).await
}
#[cfg(feature = "redis")]
LookupStore::Redis(store) => store.key_set(key, value).await,
LookupStore::Memory(_) => unimplemented!(),
}
}
@ -110,6 +116,8 @@ impl LookupStore {
.await
.map(|num| LookupValue::Counter { num }),
},
#[cfg(feature = "redis")]
LookupStore::Redis(store) => store.key_get(key).await,
LookupStore::Memory(_) => unimplemented!(),
}
}
@ -137,7 +145,7 @@ impl LookupStore {
store
.iterate(IterateParams::new(from_key, to_key), |key, value| {
if value.deserialize_be_u64(0)? < current_time {
expired_keys.push(key.to_vec());
expired_keys.push(key.get(1..).unwrap_or_default().to_vec());
}
Ok(true)
})
@ -159,6 +167,8 @@ impl LookupStore {
}
}
}
#[cfg(feature = "redis")]
LookupStore::Redis(store) => {}
LookupStore::Memory(_) => {}
}

View file

@ -434,7 +434,7 @@ impl Store {
value
);
}
SUBSPACE_INDEX_VALUES if key[0] >= 2 => {
SUBSPACE_INDEX_VALUES if key[0] >= 3 => {
// Ignore named keys
return Ok(true);
}

View file

@ -60,6 +60,9 @@ use backend::rocksdb::RocksDbStore;
#[cfg(feature = "elastic")]
use backend::elastic::ElasticSearchStore;
#[cfg(feature = "redis")]
use backend::redis::RedisStore;
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> crate::Result<Self>;
}
@ -236,6 +239,8 @@ pub enum FtsStore {
pub enum LookupStore {
Store(Store),
Memory(Arc<MemoryStore>),
#[cfg(feature = "redis")]
Redis(Arc<RedisStore>),
}
#[cfg(feature = "sqlite")]
@ -293,6 +298,13 @@ impl From<ElasticSearchStore> for FtsStore {
}
}
#[cfg(feature = "redis")]
impl From<RedisStore> for LookupStore {
fn from(store: RedisStore) -> Self {
Self::Redis(Arc::new(store))
}
}
impl From<Store> for FtsStore {
fn from(store: Store) -> Self {
Self::Store(store)

View file

@ -5,8 +5,8 @@ edition = "2021"
resolver = "2"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3"]
default = ["sqlite", "postgres", "mysql", "foundationdb", "rocks"]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "redis"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]
@ -14,6 +14,7 @@ mysql = ["store/mysql"]
rocks = ["store/rocks"]
elastic = ["store/elastic"]
s3 = ["store/s3"]
redis = ["store/redis"]
[dev-dependencies]
store = { path = "../crates/store", features = ["test_mode"] }

View file

@ -46,6 +46,10 @@ duplicate-expiry = "7d"
type = "sqlite"
path = "%PATH%/test_antispam.db"
#[store."redis"]
#type = "redis"
#url = "redis://127.0.0.1"
[store."default"]
type = "memory"

121
tests/src/store/lookup.rs Normal file
View file

@ -0,0 +1,121 @@
/*
* Copyright (c) 2023, Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use store::{config::ConfigStore, LookupKey, LookupStore, LookupValue};
use utils::config::Config;
use crate::store::{TempDir, CONFIG};
#[tokio::test]
pub async fn lookup_tests() {
let temp_dir = TempDir::new("lookup_tests", true);
let config =
Config::new(&CONFIG.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())).unwrap();
let stores = config.parse_stores().await.unwrap();
for (store_id, store) in stores.lookup_stores {
println!("Testing lookup store {}...", store_id);
if let LookupStore::Store(store) = &store {
store.destroy().await;
}
// Test value expiry
let key = "xyz".as_bytes().to_vec();
assert_eq!(
LookupValue::None,
store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap()
);
store
.key_set(
key.clone(),
LookupValue::Value {
value: "hello".to_string().into_bytes(),
expires: 1,
},
)
.await
.unwrap();
assert!(matches!(store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap(), LookupValue::Value { value,.. } if value == "hello"));
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
assert_eq!(
LookupValue::None,
store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap()
);
store.purge_expired().await.unwrap();
if let LookupStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
}
// Test key
store
.key_set(
key.clone(),
LookupValue::Value {
value: "world".to_string().into_bytes(),
expires: 0,
},
)
.await
.unwrap();
store.purge_expired().await.unwrap();
assert!(matches!(store
.key_get::<String>(LookupKey::Key(key.clone()))
.await
.unwrap(), LookupValue::Value { value,.. } if value == "world"));
// Test counter
let key = "abc".as_bytes().to_vec();
store
.key_set(key.clone(), LookupValue::Counter { num: 1 })
.await
.unwrap();
assert_eq!(
LookupValue::Counter { num: 1 },
store
.key_get::<String>(LookupKey::Counter(key.clone()))
.await
.unwrap()
);
store
.key_set(key.clone(), LookupValue::Counter { num: 2 })
.await
.unwrap();
assert_eq!(
LookupValue::Counter { num: 3 },
store
.key_get::<String>(LookupKey::Counter(key.clone()))
.await
.unwrap()
);
}
}

View file

@ -23,6 +23,7 @@
pub mod assign_id;
pub mod blob;
pub mod lookup;
pub mod ops;
pub mod query;
@ -74,6 +75,11 @@ port = 3307
database = "stalwart"
user = "root"
password = "password"
[store."redis"]
type = "redis"
url = "redis://127.0.0.1"
"#;
#[tokio::test]