From ab895b2faef6ec20ac6361bdd33ba3df0e4990e1 Mon Sep 17 00:00:00 2001 From: Mauro D Date: Tue, 6 Jun 2023 16:53:29 +0000 Subject: [PATCH] Minio/S3 blob storage support. --- Cargo.lock | 146 ++++++++++++++++++++++- crates/jmap/src/api/admin.rs | 6 + crates/jmap/src/blob/upload.rs | 2 +- crates/store/Cargo.toml | 1 + crates/store/src/blob/mod.rs | 174 +++++++++++++++++++++++---- crates/store/src/blob/read.rs | 31 ++++- crates/store/src/blob/write.rs | 101 ++++++++++++---- tests/src/jmap/mod.rs | 7 +- tests/src/store/blob.rs | 212 +++++++++++++++++++++++++++++++++ tests/src/store/mod.rs | 6 +- 10 files changed, 628 insertions(+), 58 deletions(-) create mode 100644 tests/src/store/blob.rs diff --git a/Cargo.lock b/Cargo.lock index 23322db1..a18772f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,22 @@ dependencies = [ "num-traits", ] +[[package]] +name = "attohttpc" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fcf00bc6d5abb29b5f97e3c61a90b6d3caa12f3faf897d4a3e3607c050a35a7" +dependencies = [ + "http", + "log", + "rustls 0.20.8", + "serde", + "serde_json", + "url", + "webpki", + "webpki-roots 0.22.6", +] + [[package]] name = "atty" version = "0.2.14" @@ -244,6 +260,32 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "aws-creds" +version = "0.34.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3776743bb68d4ad02ba30ba8f64373f1be4e082fe47651767171ce75bb2f6cf5" +dependencies = [ + "attohttpc", + "dirs", + "log", + "quick-xml 0.26.0", + "rust-ini", + "serde", + "thiserror", + "time 0.3.21", + "url", +] + +[[package]] +name = "aws-region" +version = "0.25.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "056557a61427d0e5ba29dd931031c8ffed4ee7a550e7cd55692a9d8deb0a9dba" +dependencies = [ + "thiserror", +] + [[package]] name = "axum" version = "0.6.18" @@ -900,6 +942,26 @@ dependencies = [ "utils", ] +[[package]] +name = "dirs" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059" +dependencies = [ + "dirs-sys", +] + +[[package]] +name = "dirs-sys" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "displaydoc" version = "0.2.4" @@ -911,6 +973,12 @@ dependencies = [ "syn 2.0.18", ] +[[package]] +name = "dlv-list" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" + [[package]] name = "dotenvy" version = "0.15.7" @@ -1824,7 +1892,7 @@ dependencies = [ [[package]] name = "jmap-client" version = "0.3.0" -source = "git+https://github.com/stalwartlabs/jmap-client#3cba63c2246536c9b7997d7aea0124272af84ec8" +source = "git+https://github.com/stalwartlabs/jmap-client#952f272d091f75d115edaf6364a006ac6a56490a" dependencies = [ "ahash 0.8.3", "async-stream", @@ -2043,7 +2111,7 @@ dependencies = [ "mail-builder", "mail-parser", "parking_lot", - "quick-xml", + "quick-xml 0.28.2", "ring", "rustls-pemfile", "serde", @@ -2514,6 +2582,16 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-multimap" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a" +dependencies = [ + "dlv-list", + "hashbrown 0.12.3", +] + [[package]] name = "os_str_bytes" version = "6.5.0" @@ -2906,6 +2984,16 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-xml" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quick-xml" version = "0.28.2" @@ -3005,6 +3093,17 @@ dependencies = [ "bitflags 1.3.2", ] +[[package]] +name = "redox_users" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b" +dependencies = [ + "getrandom", + "redox_syscall 0.2.16", + "thiserror", +] + [[package]] name = "regex" version = "1.8.3" @@ -3175,6 +3274,48 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rust-ini" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b2ac5ff6acfbe74226fa701b5ef793aaa054055c13ebb7060ad36942956e027" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64 0.13.1", + "bytes", + "cfg-if", + "futures", + "hex", + "hmac 0.12.1", + "http", + "log", + "maybe-async 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)", + "md5", + "percent-encoding", + "quick-xml 0.26.0", + "reqwest", + "serde", + "serde_derive", + "sha2 0.10.6", + "thiserror", + "time 0.3.21", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rust-stemmers" version = "1.2.0" @@ -3886,6 +4027,7 @@ dependencies = [ "roaring", "rocksdb", "rusqlite", + "rust-s3", "rust-stemmers", "serde", "siphasher", diff --git a/crates/jmap/src/api/admin.rs b/crates/jmap/src/api/admin.rs index 46843cf7..ffa7a725 100644 --- a/crates/jmap/src/api/admin.rs +++ b/crates/jmap/src/api/admin.rs @@ -42,6 +42,12 @@ impl JMAP { document_id: 0, }) .await?; + self.store + .bulk_delete_blob(&store::BlobKind::LinkedMaildir { + account_id, + document_id: 0, + }) + .await?; // Delete mailboxes let mut batch = BatchBuilder::new(); diff --git a/crates/jmap/src/blob/upload.rs b/crates/jmap/src/blob/upload.rs index 3a6c9a87..c00b65d9 100644 --- a/crates/jmap/src/blob/upload.rs +++ b/crates/jmap/src/blob/upload.rs @@ -74,7 +74,7 @@ impl JMAP { } } - pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> Result { + pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> Result<(), MethodError> { self.store.put_blob(kind, data).await.map_err(|err| { tracing::error!( event = "error", diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 71441d08..8d45aaa6 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -10,6 +10,7 @@ maybe-async = { path = "../maybe-async" } rocksdb = { version = "0.20.1", optional = true } foundationdb = { version = "0.7.0", optional = true } rusqlite = { version = "0.29.0", features = ["bundled"], optional = true } +rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] } tokio = { version = "1.23", features = ["sync", "fs", "io-util"] } r2d2 = { version = "0.8.10", optional = true } futures = { version = "0.3", optional = true } diff --git a/crates/store/src/blob/mod.rs b/crates/store/src/blob/mod.rs index 6ae49aef..5deedc35 100644 --- a/crates/store/src/blob/mod.rs +++ b/crates/store/src/blob/mod.rs @@ -24,22 +24,81 @@ pub mod read; pub mod write; -use std::path::{Path, PathBuf}; +use std::{path::PathBuf, time::Duration}; +use s3::{ + creds::{error::CredentialsError, Credentials}, + error::S3Error, + Bucket, Region, +}; use utils::config::Config; use crate::BlobKind; pub enum BlobStore { - Local(PathBuf), - Remote(String), + Local(BlobPaths), + Remote(Bucket), +} + +pub struct BlobPaths { + path_email: PathBuf, + path_temporary: PathBuf, + path_other: PathBuf, } impl BlobStore { pub async fn new(config: &Config) -> crate::Result { - Ok(BlobStore::Local( - config.value_require("store.blob.path")?.into(), - )) + match config.value_require("store.blob.type")? { + "s3" | "minio" | "gcs" => { + // Obtain region and endpoint from config + let region = config.value_require("store.blob.s3.region")?; + let region = if let Some(endpoint) = config.value("store.blob.s3.endpoint") { + Region::Custom { + region: region.to_string(), + endpoint: endpoint.to_string(), + } + } else { + region.parse().unwrap() + }; + let credentials = Credentials::new( + config.value("store.blob.s3.access-key"), + config.value("store.blob.s3.secret-key"), + config.value("store.blob.s3.security-token"), + config.value("store.blob.s3.session-token"), + config.value("store.blob.s3.profile"), + )?; + let timeout = + config.property_or_static::("store.blob.s3.timeout", "30s")?; + + Ok(BlobStore::Remote( + Bucket::new( + config.value_require("store.blob.s3.bucket")?, + region, + credentials, + )? + .with_path_style() + .with_request_timeout(timeout), + )) + } + "local" => { + let path = config.property_require::("store.blob.local.path")?; + let mut path_email = path.clone(); + path_email.push("emails"); + let mut path_temporary = path.clone(); + path_temporary.push("tmp"); + let mut path_other = path; + path_other.push("blobs"); + + Ok(BlobStore::Local(BlobPaths { + path_email, + path_temporary, + path_other, + })) + } + unknown => Err(crate::Error::InternalError(format!( + "Unknown blob store type: {unknown}", + ))), + } } } @@ -49,26 +108,41 @@ impl From for crate::Error { } } -fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result { - let mut path = base_path.to_path_buf(); +impl From for crate::Error { + fn from(err: S3Error) -> Self { + Self::InternalError(format!("S3 error: {}", err)) + } +} + +impl From for crate::Error { + fn from(err: CredentialsError) -> Self { + Self::InternalError(format!("S3 Credentials error: {}", err)) + } +} + +fn get_local_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf { match kind { + BlobKind::LinkedMaildir { + account_id, + document_id, + } => { + let mut path = base_path.path_email.to_path_buf(); + path.push(format!("{:x}", account_id)); + path.push("Maildir"); + path.push("cur"); + path.push(format!("{:x}", document_id)); + path + } BlobKind::Linked { account_id, collection, document_id, } => { + let mut path = base_path.path_other.to_path_buf(); path.push(format!("{:x}", account_id)); path.push(format!("{:x}", collection)); path.push(format!("{:x}", document_id)); - } - BlobKind::LinkedMaildir { - account_id, - document_id, - } => { - path.push(format!("{:x}", account_id)); - path.push("Maildir"); - path.push("cur"); - path.push(format!("{:x}", document_id)); + path } BlobKind::Temporary { account_id, @@ -77,22 +151,27 @@ fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result { creation_day, seq, } => { - path.push("tmp"); + let mut path = base_path.path_temporary.to_path_buf(); path.push(creation_year.to_string()); path.push(creation_month.to_string()); path.push(creation_day.to_string()); path.push(format!("{:x}_{:x}", account_id, seq)); + path } } - - Ok(path) } -fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result { - let mut path = base_path.to_path_buf(); +fn get_local_root_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf { match kind { - BlobKind::Linked { account_id, .. } | BlobKind::LinkedMaildir { account_id, .. } => { + BlobKind::LinkedMaildir { account_id, .. } => { + let mut path = base_path.path_email.to_path_buf(); path.push(format!("{:x}", account_id)); + path + } + BlobKind::Linked { account_id, .. } => { + let mut path = base_path.path_other.to_path_buf(); + path.push(format!("{:x}", account_id)); + path } BlobKind::Temporary { creation_year, @@ -100,12 +179,55 @@ fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result { creation_day, .. } => { - path.push("tmp"); + let mut path = base_path.path_temporary.to_path_buf(); path.push(creation_year.to_string()); path.push(creation_month.to_string()); path.push(creation_day.to_string()); + path } } - - Ok(path) +} + +fn get_s3_path(kind: &BlobKind) -> String { + match kind { + BlobKind::LinkedMaildir { + account_id, + document_id, + } => format!("/{:x}/{:x}", account_id, document_id), + BlobKind::Linked { + account_id, + collection, + document_id, + } => format!("/{:x}/{:x}/{:x}", account_id, collection, document_id), + BlobKind::Temporary { + account_id, + creation_year, + creation_month, + creation_day, + seq, + } => format!( + "/tmp/{}/{}/{}/{:x}_{:x}", + creation_year, creation_month, creation_day, account_id, seq + ), + } +} + +fn get_s3_root_path(kind: &BlobKind) -> String { + match kind { + BlobKind::LinkedMaildir { account_id, .. } => { + format!("/{:x}/", account_id) + } + BlobKind::Linked { account_id, .. } => { + format!("/{:x}/", account_id) + } + BlobKind::Temporary { + creation_year, + creation_month, + creation_day, + .. + } => format!( + "/tmp/{}/{}/{}/", + creation_year, creation_month, creation_day + ), + } } diff --git a/crates/store/src/blob/read.rs b/crates/store/src/blob/read.rs index d7cb80d7..953b5eb5 100644 --- a/crates/store/src/blob/read.rs +++ b/crates/store/src/blob/read.rs @@ -30,7 +30,7 @@ use tokio::{ use crate::{BlobKind, Store}; -use super::{get_path, BlobStore}; +use super::{get_local_path, get_s3_path, BlobStore}; impl Store { pub async fn get_blob( @@ -40,7 +40,7 @@ impl Store { ) -> crate::Result>> { match &self.blob { BlobStore::Local(base_path) => { - let blob_path = get_path(base_path, kind)?; + let blob_path = get_local_path(base_path, kind); let blob_size = match fs::metadata(&blob_path).await { Ok(m) => m.len(), Err(_) => return Ok(None), @@ -70,7 +70,32 @@ impl Store { buf })) } - BlobStore::Remote(_) => todo!(), + BlobStore::Remote(bucket) => { + let path = get_s3_path(kind); + let response = if range.start != 0 || range.end != u32::MAX { + bucket + .get_object_range( + path, + range.start as u64, + Some(range.end.saturating_sub(1) as u64), + ) + .await + } else { + bucket.get_object(path).await + }; + match response { + Ok(response) if (200..300).contains(&response.status_code()) => { + Ok(Some(response.to_vec())) + } + Ok(response) if response.status_code() == 404 => Ok(None), + Ok(response) => Err(crate::Error::InternalError(format!( + "S3 error code {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ))), + Err(err) => Err(err.into()), + } + } } } } diff --git a/crates/store/src/blob/write.rs b/crates/store/src/blob/write.rs index 8ae9da7d..3616e642 100644 --- a/crates/store/src/blob/write.rs +++ b/crates/store/src/blob/write.rs @@ -30,22 +30,33 @@ use tokio::{ use crate::{BlobKind, Store}; -use super::{get_path, get_root_path, BlobStore}; +use super::{get_local_path, get_local_root_path, get_s3_path, get_s3_root_path, BlobStore}; impl Store { - pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result { + pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<()> { match &self.blob { BlobStore::Local(base_path) => { - let blob_path = get_path(base_path, kind)?; + let blob_path = get_local_path(base_path, kind); fs::create_dir_all(blob_path.parent().unwrap()).await?; let mut blob_file = File::create(&blob_path).await?; blob_file.write_all(data).await?; blob_file.flush().await?; - Ok(true) + Ok(()) + } + BlobStore::Remote(bucket) => { + let path = get_s3_path(kind); + match bucket.put_object(path, data).await { + Ok(response) if (200..300).contains(&response.status_code()) => Ok(()), + Ok(response) => Err(crate::Error::InternalError(format!( + "S3 error code {}: {}", + response.status_code(), + String::from_utf8_lossy(response.as_slice()) + ))), + Err(e) => Err(e.into()), + } } - BlobStore::Remote(_) => todo!(), } } @@ -55,20 +66,19 @@ impl Store { dest: &BlobKind, range: Option>, ) -> crate::Result { - match &self.blob { - BlobStore::Local(base_path) => { - let dest_path = get_path(base_path, dest)?; + if let Some(range) = range { + if let Some(bytes) = self.get_blob(src, range).await? { + self.put_blob(dest, &bytes).await?; + Ok(true) + } else { + Ok(false) + } + } else { + match &self.blob { + BlobStore::Local(base_path) => { + let dest_path = get_local_path(base_path, dest); + let src_path = get_local_path(base_path, src); - if let Some(range) = range { - if let Some(bytes) = self.get_blob(src, range).await? { - fs::create_dir_all(dest_path.parent().unwrap()).await?; - fs::write(dest_path, bytes).await?; - Ok(true) - } else { - Ok(false) - } - } else { - let src_path = get_path(base_path, src)?; if fs::metadata(&src_path).await.is_ok() { fs::create_dir_all(dest_path.parent().unwrap()).await?; fs::copy(src_path, dest_path).await?; @@ -77,15 +87,24 @@ impl Store { Ok(false) } } + BlobStore::Remote(bucket) => { + let src_path = get_s3_path(src); + let dest_path = get_s3_path(dest); + + bucket + .copy_object_internal(src_path, dest_path) + .await + .map(|code| (200..300).contains(&code)) + .map_err(|e| e.into()) + } } - BlobStore::Remote(_) => todo!(), } } pub async fn delete_blob(&self, kind: &BlobKind) -> crate::Result { match &self.blob { BlobStore::Local(base_path) => { - let blob_path = get_path(base_path, kind)?; + let blob_path = get_local_path(base_path, kind); if blob_path.exists() { fs::remove_file(&blob_path).await?; @@ -94,16 +113,52 @@ impl Store { Ok(false) } } - BlobStore::Remote(_) => todo!(), + BlobStore::Remote(bucket) => { + let path = get_s3_path(kind); + bucket + .delete_object(path) + .await + .map(|response| (200..300).contains(&response.status_code())) + .map_err(|e| e.into()) + } } } pub async fn bulk_delete_blob(&self, kind: &BlobKind) -> crate::Result<()> { match &self.blob { - BlobStore::Local(base_path) => fs::remove_dir_all(get_root_path(base_path, kind)?) + BlobStore::Local(base_path) => fs::remove_dir_all(get_local_root_path(base_path, kind)) .await .map_err(Into::into), - BlobStore::Remote(_) => todo!(), + BlobStore::Remote(bucket) => { + let prefix = get_s3_root_path(kind); + let prefix_base = prefix.strip_prefix('/').unwrap(); + let mut is_truncated = true; + while is_truncated { + for item in bucket.list(prefix.clone(), None).await? { + is_truncated = item.is_truncated && !item.contents.is_empty(); + for object in item.contents { + if object.key.starts_with(&prefix) + || object.key.starts_with(prefix_base) + { + let result = bucket.delete_object(object.key).await?; + if !(200..300).contains(&result.status_code()) { + return Err(crate::Error::InternalError(format!( + "Failed to delete bucket item, code {}: {}", + result.status_code(), + String::from_utf8_lossy(result.as_slice()) + ))); + } + } else { + tracing::debug!( + "Unexpected S3 object while deleting: {}", + item.name + ); + } + } + } + } + Ok(()) + } } } } diff --git a/tests/src/jmap/mod.rs b/tests/src/jmap/mod.rs index 2255e70b..b0e7076d 100644 --- a/tests/src/jmap/mod.rs +++ b/tests/src/jmap/mod.rs @@ -127,7 +127,12 @@ future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"}, [store] db.path = "{TMP}/sqlite.db" -blob.path = "{TMP}" + +[store.blob] +type = "local" + +[store.blob.local] +path = "{TMP}" [certificate.default] cert = "file://{CERT}" diff --git a/tests/src/store/blob.rs b/tests/src/store/blob.rs new file mode 100644 index 00000000..ff880f47 --- /dev/null +++ b/tests/src/store/blob.rs @@ -0,0 +1,212 @@ +use store::{BlobKind, Store}; +use utils::config::Config; + +use crate::store::TempDir; + +const CONFIG_S3: &str = r#" +[store.db] +path = "{TMP}/_blob_s3_test_delete.db?mode=rwc" + +[store.blob] +type = "s3" + +[store.blob.s3] +access-key = "minioadmin" +secret-key = "minioadmin" +region = "eu-central-1" +endpoint = "http://localhost:9000" +bucket = "tmp" + +"#; + +const CONFIG_LOCAL: &str = r#" +[store.db] +path = "{TMP}/_blob_s3_test_delete.db?mode=rwc" + +[store.blob] +type = "local" + +[store.blob.local] +path = "{TMP}" + +"#; + +const DATA: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce erat nisl, dignissim a porttitor id, varius nec arcu. Sed mauris."; + +#[tokio::test] +pub async fn blob_s3_test() { + let temp_dir = TempDir::new("blob_tests", true); + test_blob( + Store::open( + &Config::parse( + &CONFIG_LOCAL.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap()), + ) + .unwrap(), + ) + .await + .unwrap(), + ) + .await; + test_blob( + Store::open( + &Config::parse(&CONFIG_S3.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())) + .unwrap(), + ) + .await + .unwrap(), + ) + .await; + temp_dir.delete(); +} + +async fn test_blob(store: Store) { + // Store and fetch + let kind = BlobKind::LinkedMaildir { + account_id: 0, + document_id: 0, + }; + store.put_blob(&kind, DATA).await.unwrap(); + assert_eq!( + String::from_utf8(store.get_blob(&kind, 0..u32::MAX).await.unwrap().unwrap()).unwrap(), + std::str::from_utf8(DATA).unwrap() + ); + assert_eq!( + String::from_utf8(store.get_blob(&kind, 11..57).await.unwrap().unwrap()).unwrap(), + std::str::from_utf8(&DATA[11..57]).unwrap() + ); + assert!(store.delete_blob(&kind).await.unwrap()); + assert!(store.get_blob(&kind, 0..u32::MAX).await.unwrap().is_none()); + + // Copy + let src_kind = BlobKind::LinkedMaildir { + account_id: 0, + document_id: 1, + }; + store.put_blob(&src_kind, DATA).await.unwrap(); + for id in 0..4 { + let dest_kind = BlobKind::LinkedMaildir { + account_id: 1, + document_id: id, + }; + assert!(store.copy_blob(&src_kind, &dest_kind, None).await.unwrap()); + + assert_eq!( + String::from_utf8( + store + .get_blob(&dest_kind, 0..u32::MAX) + .await + .unwrap() + .unwrap() + ) + .unwrap(), + std::str::from_utf8(DATA).unwrap() + ); + } + + // Copy partial + let tmp_kind = BlobKind::Temporary { + account_id: 1, + creation_year: 2020, + creation_month: 12, + creation_day: 31, + seq: 0, + }; + let tmp_kind2 = BlobKind::Temporary { + account_id: 1, + creation_year: 2021, + creation_month: 1, + creation_day: 1, + seq: 0, + }; + assert!(store + .copy_blob(&src_kind, &tmp_kind, (0..11).into()) + .await + .unwrap()); + assert!(store + .copy_blob(&src_kind, &tmp_kind2, (0..11).into()) + .await + .unwrap()); + assert_eq!( + String::from_utf8( + store + .get_blob(&tmp_kind, 0..u32::MAX) + .await + .unwrap() + .unwrap() + ) + .unwrap(), + std::str::from_utf8(&DATA[0..11]).unwrap() + ); + + // Delete range + store + .bulk_delete_blob(&BlobKind::LinkedMaildir { + account_id: 1, + document_id: 0, + }) + .await + .unwrap(); + store.bulk_delete_blob(&tmp_kind).await.unwrap(); + + // Make sure the blobs are deleted + for id in 0..4 { + assert!(store + .get_blob( + &BlobKind::LinkedMaildir { + account_id: 1, + document_id: id, + }, + 0..u32::MAX + ) + .await + .unwrap() + .is_none()); + } + assert!(store + .get_blob(&tmp_kind, 0..u32::MAX) + .await + .unwrap() + .is_none()); + + // Make sure other blobs were not deleted + assert!(store + .get_blob(&src_kind, 0..u32::MAX) + .await + .unwrap() + .is_some()); + assert!(store + .get_blob(&tmp_kind2, 0..u32::MAX) + .await + .unwrap() + .is_some()); + + // Copying a non-existing blob should fail + assert!(!store.copy_blob(&tmp_kind, &src_kind, None).await.unwrap()); + + // Copy blob between buckets + assert!(store + .copy_blob(&src_kind, &tmp_kind, (10..20).into()) + .await + .unwrap()); + assert_eq!( + String::from_utf8( + store + .get_blob(&tmp_kind, 0..u32::MAX) + .await + .unwrap() + .unwrap() + ) + .unwrap(), + std::str::from_utf8(&DATA[10..20]).unwrap() + ); + + // Delete blobs + for blob_kind in [src_kind, tmp_kind, tmp_kind2] { + assert!(store.delete_blob(&blob_kind).await.unwrap()); + assert!(store + .get_blob(&blob_kind, 0..u32::MAX) + .await + .unwrap() + .is_none()); + } +} diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index 2db77674..c97ebdba 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -22,6 +22,7 @@ */ pub mod assign_id; +pub mod blob; pub mod query; use std::{io::Read, sync::Arc}; @@ -39,7 +40,8 @@ pub async fn store_tests() { let temp_dir = TempDir::new("store_tests", insert); let config_file = format!( concat!( - "store.blob.path = \"{}\"\n", + "store.blob.type = \"local\"\n", + "store.blob.local.path = \"{}\"\n", "store.db.path = \"{}/sqlite.db\"\n" ), temp_dir.path.display(), @@ -53,7 +55,7 @@ pub async fn store_tests() { if insert { db.destroy().await; } - //assign_id::test(db).await; + assign_id::test(db.clone()).await; query::test(db, insert).await; temp_dir.delete(); }