mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-10-29 05:45:54 +08:00
Minio/S3 blob storage support.
This commit is contained in:
parent
c6e45a21e3
commit
ab895b2fae
10 changed files with 628 additions and 58 deletions
146
Cargo.lock
generated
146
Cargo.lock
generated
|
|
@ -227,6 +227,22 @@ dependencies = [
|
|||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "attohttpc"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1fcf00bc6d5abb29b5f97e3c61a90b6d3caa12f3faf897d4a3e3607c050a35a7"
|
||||
dependencies = [
|
||||
"http",
|
||||
"log",
|
||||
"rustls 0.20.8",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"url",
|
||||
"webpki",
|
||||
"webpki-roots 0.22.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atty"
|
||||
version = "0.2.14"
|
||||
|
|
@ -244,6 +260,32 @@ version = "1.1.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
|
||||
|
||||
[[package]]
|
||||
name = "aws-creds"
|
||||
version = "0.34.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3776743bb68d4ad02ba30ba8f64373f1be4e082fe47651767171ce75bb2f6cf5"
|
||||
dependencies = [
|
||||
"attohttpc",
|
||||
"dirs",
|
||||
"log",
|
||||
"quick-xml 0.26.0",
|
||||
"rust-ini",
|
||||
"serde",
|
||||
"thiserror",
|
||||
"time 0.3.21",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "aws-region"
|
||||
version = "0.25.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "056557a61427d0e5ba29dd931031c8ffed4ee7a550e7cd55692a9d8deb0a9dba"
|
||||
dependencies = [
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum"
|
||||
version = "0.6.18"
|
||||
|
|
@ -900,6 +942,26 @@ dependencies = [
|
|||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs"
|
||||
version = "4.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ca3aa72a6f96ea37bbc5aa912f6788242832f75369bdfdadcb0e38423f100059"
|
||||
dependencies = [
|
||||
"dirs-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dirs-sys"
|
||||
version = "0.3.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b1d1d91c932ef41c0f2663aa8b0ca0342d444d842c06914aa0a7e352d0bada6"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"redox_users",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "displaydoc"
|
||||
version = "0.2.4"
|
||||
|
|
@ -911,6 +973,12 @@ dependencies = [
|
|||
"syn 2.0.18",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dlv-list"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
|
||||
|
||||
[[package]]
|
||||
name = "dotenvy"
|
||||
version = "0.15.7"
|
||||
|
|
@ -1824,7 +1892,7 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "jmap-client"
|
||||
version = "0.3.0"
|
||||
source = "git+https://github.com/stalwartlabs/jmap-client#3cba63c2246536c9b7997d7aea0124272af84ec8"
|
||||
source = "git+https://github.com/stalwartlabs/jmap-client#952f272d091f75d115edaf6364a006ac6a56490a"
|
||||
dependencies = [
|
||||
"ahash 0.8.3",
|
||||
"async-stream",
|
||||
|
|
@ -2043,7 +2111,7 @@ dependencies = [
|
|||
"mail-builder",
|
||||
"mail-parser",
|
||||
"parking_lot",
|
||||
"quick-xml",
|
||||
"quick-xml 0.28.2",
|
||||
"ring",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
|
|
@ -2514,6 +2582,16 @@ dependencies = [
|
|||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ordered-multimap"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ccd746e37177e1711c20dd619a1620f34f5c8b569c53590a72dedd5344d8924a"
|
||||
dependencies = [
|
||||
"dlv-list",
|
||||
"hashbrown 0.12.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "os_str_bytes"
|
||||
version = "6.5.0"
|
||||
|
|
@ -2906,6 +2984,16 @@ version = "1.2.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7f50b1c63b38611e7d4d7f68b82d3ad0cc71a2ad2e7f61fc10f1328d917c93cd"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "quick-xml"
|
||||
version = "0.28.2"
|
||||
|
|
@ -3005,6 +3093,17 @@ dependencies = [
|
|||
"bitflags 1.3.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_users"
|
||||
version = "0.4.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b033d837a7cf162d7993aded9304e30a83213c648b6e389db233191f891e5c2b"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
"redox_syscall 0.2.16",
|
||||
"thiserror",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "regex"
|
||||
version = "1.8.3"
|
||||
|
|
@ -3175,6 +3274,48 @@ dependencies = [
|
|||
"smallvec",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rust-ini"
|
||||
version = "0.18.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6d5f2436026b4f6e79dc829837d467cc7e9a55ee40e750d716713540715a2df"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"ordered-multimap",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rust-s3"
|
||||
version = "0.33.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b2ac5ff6acfbe74226fa701b5ef793aaa054055c13ebb7060ad36942956e027"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"aws-creds",
|
||||
"aws-region",
|
||||
"base64 0.13.1",
|
||||
"bytes",
|
||||
"cfg-if",
|
||||
"futures",
|
||||
"hex",
|
||||
"hmac 0.12.1",
|
||||
"http",
|
||||
"log",
|
||||
"maybe-async 0.2.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"md5",
|
||||
"percent-encoding",
|
||||
"quick-xml 0.26.0",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_derive",
|
||||
"sha2 0.10.6",
|
||||
"thiserror",
|
||||
"time 0.3.21",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rust-stemmers"
|
||||
version = "1.2.0"
|
||||
|
|
@ -3886,6 +4027,7 @@ dependencies = [
|
|||
"roaring",
|
||||
"rocksdb",
|
||||
"rusqlite",
|
||||
"rust-s3",
|
||||
"rust-stemmers",
|
||||
"serde",
|
||||
"siphasher",
|
||||
|
|
|
|||
|
|
@ -42,6 +42,12 @@ impl JMAP {
|
|||
document_id: 0,
|
||||
})
|
||||
.await?;
|
||||
self.store
|
||||
.bulk_delete_blob(&store::BlobKind::LinkedMaildir {
|
||||
account_id,
|
||||
document_id: 0,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Delete mailboxes
|
||||
let mut batch = BatchBuilder::new();
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ impl JMAP {
|
|||
}
|
||||
}
|
||||
|
||||
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> Result<bool, MethodError> {
|
||||
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> Result<(), MethodError> {
|
||||
self.store.put_blob(kind, data).await.map_err(|err| {
|
||||
tracing::error!(
|
||||
event = "error",
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ maybe-async = { path = "../maybe-async" }
|
|||
rocksdb = { version = "0.20.1", optional = true }
|
||||
foundationdb = { version = "0.7.0", optional = true }
|
||||
rusqlite = { version = "0.29.0", features = ["bundled"], optional = true }
|
||||
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] }
|
||||
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }
|
||||
r2d2 = { version = "0.8.10", optional = true }
|
||||
futures = { version = "0.3", optional = true }
|
||||
|
|
|
|||
|
|
@ -24,22 +24,81 @@
|
|||
pub mod read;
|
||||
pub mod write;
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::{path::PathBuf, time::Duration};
|
||||
|
||||
use s3::{
|
||||
creds::{error::CredentialsError, Credentials},
|
||||
error::S3Error,
|
||||
Bucket, Region,
|
||||
};
|
||||
use utils::config::Config;
|
||||
|
||||
use crate::BlobKind;
|
||||
|
||||
pub enum BlobStore {
|
||||
Local(PathBuf),
|
||||
Remote(String),
|
||||
Local(BlobPaths),
|
||||
Remote(Bucket),
|
||||
}
|
||||
|
||||
pub struct BlobPaths {
|
||||
path_email: PathBuf,
|
||||
path_temporary: PathBuf,
|
||||
path_other: PathBuf,
|
||||
}
|
||||
|
||||
impl BlobStore {
|
||||
pub async fn new(config: &Config) -> crate::Result<Self> {
|
||||
Ok(BlobStore::Local(
|
||||
config.value_require("store.blob.path")?.into(),
|
||||
))
|
||||
match config.value_require("store.blob.type")? {
|
||||
"s3" | "minio" | "gcs" => {
|
||||
// Obtain region and endpoint from config
|
||||
let region = config.value_require("store.blob.s3.region")?;
|
||||
let region = if let Some(endpoint) = config.value("store.blob.s3.endpoint") {
|
||||
Region::Custom {
|
||||
region: region.to_string(),
|
||||
endpoint: endpoint.to_string(),
|
||||
}
|
||||
} else {
|
||||
region.parse().unwrap()
|
||||
};
|
||||
let credentials = Credentials::new(
|
||||
config.value("store.blob.s3.access-key"),
|
||||
config.value("store.blob.s3.secret-key"),
|
||||
config.value("store.blob.s3.security-token"),
|
||||
config.value("store.blob.s3.session-token"),
|
||||
config.value("store.blob.s3.profile"),
|
||||
)?;
|
||||
let timeout =
|
||||
config.property_or_static::<Duration>("store.blob.s3.timeout", "30s")?;
|
||||
|
||||
Ok(BlobStore::Remote(
|
||||
Bucket::new(
|
||||
config.value_require("store.blob.s3.bucket")?,
|
||||
region,
|
||||
credentials,
|
||||
)?
|
||||
.with_path_style()
|
||||
.with_request_timeout(timeout),
|
||||
))
|
||||
}
|
||||
"local" => {
|
||||
let path = config.property_require::<PathBuf>("store.blob.local.path")?;
|
||||
let mut path_email = path.clone();
|
||||
path_email.push("emails");
|
||||
let mut path_temporary = path.clone();
|
||||
path_temporary.push("tmp");
|
||||
let mut path_other = path;
|
||||
path_other.push("blobs");
|
||||
|
||||
Ok(BlobStore::Local(BlobPaths {
|
||||
path_email,
|
||||
path_temporary,
|
||||
path_other,
|
||||
}))
|
||||
}
|
||||
unknown => Err(crate::Error::InternalError(format!(
|
||||
"Unknown blob store type: {unknown}",
|
||||
))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -49,26 +108,41 @@ impl From<std::io::Error> for crate::Error {
|
|||
}
|
||||
}
|
||||
|
||||
fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
|
||||
let mut path = base_path.to_path_buf();
|
||||
impl From<S3Error> for crate::Error {
|
||||
fn from(err: S3Error) -> Self {
|
||||
Self::InternalError(format!("S3 error: {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<CredentialsError> for crate::Error {
|
||||
fn from(err: CredentialsError) -> Self {
|
||||
Self::InternalError(format!("S3 Credentials error: {}", err))
|
||||
}
|
||||
}
|
||||
|
||||
fn get_local_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf {
|
||||
match kind {
|
||||
BlobKind::LinkedMaildir {
|
||||
account_id,
|
||||
document_id,
|
||||
} => {
|
||||
let mut path = base_path.path_email.to_path_buf();
|
||||
path.push(format!("{:x}", account_id));
|
||||
path.push("Maildir");
|
||||
path.push("cur");
|
||||
path.push(format!("{:x}", document_id));
|
||||
path
|
||||
}
|
||||
BlobKind::Linked {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
} => {
|
||||
let mut path = base_path.path_other.to_path_buf();
|
||||
path.push(format!("{:x}", account_id));
|
||||
path.push(format!("{:x}", collection));
|
||||
path.push(format!("{:x}", document_id));
|
||||
}
|
||||
BlobKind::LinkedMaildir {
|
||||
account_id,
|
||||
document_id,
|
||||
} => {
|
||||
path.push(format!("{:x}", account_id));
|
||||
path.push("Maildir");
|
||||
path.push("cur");
|
||||
path.push(format!("{:x}", document_id));
|
||||
path
|
||||
}
|
||||
BlobKind::Temporary {
|
||||
account_id,
|
||||
|
|
@ -77,22 +151,27 @@ fn get_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
|
|||
creation_day,
|
||||
seq,
|
||||
} => {
|
||||
path.push("tmp");
|
||||
let mut path = base_path.path_temporary.to_path_buf();
|
||||
path.push(creation_year.to_string());
|
||||
path.push(creation_month.to_string());
|
||||
path.push(creation_day.to_string());
|
||||
path.push(format!("{:x}_{:x}", account_id, seq));
|
||||
path
|
||||
}
|
||||
}
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
|
||||
let mut path = base_path.to_path_buf();
|
||||
fn get_local_root_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf {
|
||||
match kind {
|
||||
BlobKind::Linked { account_id, .. } | BlobKind::LinkedMaildir { account_id, .. } => {
|
||||
BlobKind::LinkedMaildir { account_id, .. } => {
|
||||
let mut path = base_path.path_email.to_path_buf();
|
||||
path.push(format!("{:x}", account_id));
|
||||
path
|
||||
}
|
||||
BlobKind::Linked { account_id, .. } => {
|
||||
let mut path = base_path.path_other.to_path_buf();
|
||||
path.push(format!("{:x}", account_id));
|
||||
path
|
||||
}
|
||||
BlobKind::Temporary {
|
||||
creation_year,
|
||||
|
|
@ -100,12 +179,55 @@ fn get_root_path(base_path: &Path, kind: &BlobKind) -> crate::Result<PathBuf> {
|
|||
creation_day,
|
||||
..
|
||||
} => {
|
||||
path.push("tmp");
|
||||
let mut path = base_path.path_temporary.to_path_buf();
|
||||
path.push(creation_year.to_string());
|
||||
path.push(creation_month.to_string());
|
||||
path.push(creation_day.to_string());
|
||||
path
|
||||
}
|
||||
}
|
||||
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
fn get_s3_path(kind: &BlobKind) -> String {
|
||||
match kind {
|
||||
BlobKind::LinkedMaildir {
|
||||
account_id,
|
||||
document_id,
|
||||
} => format!("/{:x}/{:x}", account_id, document_id),
|
||||
BlobKind::Linked {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
} => format!("/{:x}/{:x}/{:x}", account_id, collection, document_id),
|
||||
BlobKind::Temporary {
|
||||
account_id,
|
||||
creation_year,
|
||||
creation_month,
|
||||
creation_day,
|
||||
seq,
|
||||
} => format!(
|
||||
"/tmp/{}/{}/{}/{:x}_{:x}",
|
||||
creation_year, creation_month, creation_day, account_id, seq
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_s3_root_path(kind: &BlobKind) -> String {
|
||||
match kind {
|
||||
BlobKind::LinkedMaildir { account_id, .. } => {
|
||||
format!("/{:x}/", account_id)
|
||||
}
|
||||
BlobKind::Linked { account_id, .. } => {
|
||||
format!("/{:x}/", account_id)
|
||||
}
|
||||
BlobKind::Temporary {
|
||||
creation_year,
|
||||
creation_month,
|
||||
creation_day,
|
||||
..
|
||||
} => format!(
|
||||
"/tmp/{}/{}/{}/",
|
||||
creation_year, creation_month, creation_day
|
||||
),
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ use tokio::{
|
|||
|
||||
use crate::{BlobKind, Store};
|
||||
|
||||
use super::{get_path, BlobStore};
|
||||
use super::{get_local_path, get_s3_path, BlobStore};
|
||||
|
||||
impl Store {
|
||||
pub async fn get_blob(
|
||||
|
|
@ -40,7 +40,7 @@ impl Store {
|
|||
) -> crate::Result<Option<Vec<u8>>> {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => {
|
||||
let blob_path = get_path(base_path, kind)?;
|
||||
let blob_path = get_local_path(base_path, kind);
|
||||
let blob_size = match fs::metadata(&blob_path).await {
|
||||
Ok(m) => m.len(),
|
||||
Err(_) => return Ok(None),
|
||||
|
|
@ -70,7 +70,32 @@ impl Store {
|
|||
buf
|
||||
}))
|
||||
}
|
||||
BlobStore::Remote(_) => todo!(),
|
||||
BlobStore::Remote(bucket) => {
|
||||
let path = get_s3_path(kind);
|
||||
let response = if range.start != 0 || range.end != u32::MAX {
|
||||
bucket
|
||||
.get_object_range(
|
||||
path,
|
||||
range.start as u64,
|
||||
Some(range.end.saturating_sub(1) as u64),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
bucket.get_object(path).await
|
||||
};
|
||||
match response {
|
||||
Ok(response) if (200..300).contains(&response.status_code()) => {
|
||||
Ok(Some(response.to_vec()))
|
||||
}
|
||||
Ok(response) if response.status_code() == 404 => Ok(None),
|
||||
Ok(response) => Err(crate::Error::InternalError(format!(
|
||||
"S3 error code {}: {}",
|
||||
response.status_code(),
|
||||
String::from_utf8_lossy(response.as_slice())
|
||||
))),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,22 +30,33 @@ use tokio::{
|
|||
|
||||
use crate::{BlobKind, Store};
|
||||
|
||||
use super::{get_path, get_root_path, BlobStore};
|
||||
use super::{get_local_path, get_local_root_path, get_s3_path, get_s3_root_path, BlobStore};
|
||||
|
||||
impl Store {
|
||||
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<bool> {
|
||||
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<()> {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => {
|
||||
let blob_path = get_path(base_path, kind)?;
|
||||
let blob_path = get_local_path(base_path, kind);
|
||||
|
||||
fs::create_dir_all(blob_path.parent().unwrap()).await?;
|
||||
let mut blob_file = File::create(&blob_path).await?;
|
||||
blob_file.write_all(data).await?;
|
||||
blob_file.flush().await?;
|
||||
|
||||
Ok(true)
|
||||
Ok(())
|
||||
}
|
||||
BlobStore::Remote(bucket) => {
|
||||
let path = get_s3_path(kind);
|
||||
match bucket.put_object(path, data).await {
|
||||
Ok(response) if (200..300).contains(&response.status_code()) => Ok(()),
|
||||
Ok(response) => Err(crate::Error::InternalError(format!(
|
||||
"S3 error code {}: {}",
|
||||
response.status_code(),
|
||||
String::from_utf8_lossy(response.as_slice())
|
||||
))),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
BlobStore::Remote(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -55,20 +66,19 @@ impl Store {
|
|||
dest: &BlobKind,
|
||||
range: Option<Range<u32>>,
|
||||
) -> crate::Result<bool> {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => {
|
||||
let dest_path = get_path(base_path, dest)?;
|
||||
if let Some(range) = range {
|
||||
if let Some(bytes) = self.get_blob(src, range).await? {
|
||||
self.put_blob(dest, &bytes).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => {
|
||||
let dest_path = get_local_path(base_path, dest);
|
||||
let src_path = get_local_path(base_path, src);
|
||||
|
||||
if let Some(range) = range {
|
||||
if let Some(bytes) = self.get_blob(src, range).await? {
|
||||
fs::create_dir_all(dest_path.parent().unwrap()).await?;
|
||||
fs::write(dest_path, bytes).await?;
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
} else {
|
||||
let src_path = get_path(base_path, src)?;
|
||||
if fs::metadata(&src_path).await.is_ok() {
|
||||
fs::create_dir_all(dest_path.parent().unwrap()).await?;
|
||||
fs::copy(src_path, dest_path).await?;
|
||||
|
|
@ -77,15 +87,24 @@ impl Store {
|
|||
Ok(false)
|
||||
}
|
||||
}
|
||||
BlobStore::Remote(bucket) => {
|
||||
let src_path = get_s3_path(src);
|
||||
let dest_path = get_s3_path(dest);
|
||||
|
||||
bucket
|
||||
.copy_object_internal(src_path, dest_path)
|
||||
.await
|
||||
.map(|code| (200..300).contains(&code))
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
BlobStore::Remote(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_blob(&self, kind: &BlobKind) -> crate::Result<bool> {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => {
|
||||
let blob_path = get_path(base_path, kind)?;
|
||||
let blob_path = get_local_path(base_path, kind);
|
||||
|
||||
if blob_path.exists() {
|
||||
fs::remove_file(&blob_path).await?;
|
||||
|
|
@ -94,16 +113,52 @@ impl Store {
|
|||
Ok(false)
|
||||
}
|
||||
}
|
||||
BlobStore::Remote(_) => todo!(),
|
||||
BlobStore::Remote(bucket) => {
|
||||
let path = get_s3_path(kind);
|
||||
bucket
|
||||
.delete_object(path)
|
||||
.await
|
||||
.map(|response| (200..300).contains(&response.status_code()))
|
||||
.map_err(|e| e.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn bulk_delete_blob(&self, kind: &BlobKind) -> crate::Result<()> {
|
||||
match &self.blob {
|
||||
BlobStore::Local(base_path) => fs::remove_dir_all(get_root_path(base_path, kind)?)
|
||||
BlobStore::Local(base_path) => fs::remove_dir_all(get_local_root_path(base_path, kind))
|
||||
.await
|
||||
.map_err(Into::into),
|
||||
BlobStore::Remote(_) => todo!(),
|
||||
BlobStore::Remote(bucket) => {
|
||||
let prefix = get_s3_root_path(kind);
|
||||
let prefix_base = prefix.strip_prefix('/').unwrap();
|
||||
let mut is_truncated = true;
|
||||
while is_truncated {
|
||||
for item in bucket.list(prefix.clone(), None).await? {
|
||||
is_truncated = item.is_truncated && !item.contents.is_empty();
|
||||
for object in item.contents {
|
||||
if object.key.starts_with(&prefix)
|
||||
|| object.key.starts_with(prefix_base)
|
||||
{
|
||||
let result = bucket.delete_object(object.key).await?;
|
||||
if !(200..300).contains(&result.status_code()) {
|
||||
return Err(crate::Error::InternalError(format!(
|
||||
"Failed to delete bucket item, code {}: {}",
|
||||
result.status_code(),
|
||||
String::from_utf8_lossy(result.as_slice())
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Unexpected S3 object while deleting: {}",
|
||||
item.name
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -127,7 +127,12 @@ future-release = [ { if = "authenticated-as", ne = "", then = "99999999d"},
|
|||
|
||||
[store]
|
||||
db.path = "{TMP}/sqlite.db"
|
||||
blob.path = "{TMP}"
|
||||
|
||||
[store.blob]
|
||||
type = "local"
|
||||
|
||||
[store.blob.local]
|
||||
path = "{TMP}"
|
||||
|
||||
[certificate.default]
|
||||
cert = "file://{CERT}"
|
||||
|
|
|
|||
212
tests/src/store/blob.rs
Normal file
212
tests/src/store/blob.rs
Normal file
|
|
@ -0,0 +1,212 @@
|
|||
use store::{BlobKind, Store};
|
||||
use utils::config::Config;
|
||||
|
||||
use crate::store::TempDir;
|
||||
|
||||
const CONFIG_S3: &str = r#"
|
||||
[store.db]
|
||||
path = "{TMP}/_blob_s3_test_delete.db?mode=rwc"
|
||||
|
||||
[store.blob]
|
||||
type = "s3"
|
||||
|
||||
[store.blob.s3]
|
||||
access-key = "minioadmin"
|
||||
secret-key = "minioadmin"
|
||||
region = "eu-central-1"
|
||||
endpoint = "http://localhost:9000"
|
||||
bucket = "tmp"
|
||||
|
||||
"#;
|
||||
|
||||
const CONFIG_LOCAL: &str = r#"
|
||||
[store.db]
|
||||
path = "{TMP}/_blob_s3_test_delete.db?mode=rwc"
|
||||
|
||||
[store.blob]
|
||||
type = "local"
|
||||
|
||||
[store.blob.local]
|
||||
path = "{TMP}"
|
||||
|
||||
"#;
|
||||
|
||||
const DATA: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce erat nisl, dignissim a porttitor id, varius nec arcu. Sed mauris.";
|
||||
|
||||
#[tokio::test]
|
||||
pub async fn blob_s3_test() {
|
||||
let temp_dir = TempDir::new("blob_tests", true);
|
||||
test_blob(
|
||||
Store::open(
|
||||
&Config::parse(
|
||||
&CONFIG_LOCAL.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap()),
|
||||
)
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
test_blob(
|
||||
Store::open(
|
||||
&Config::parse(&CONFIG_S3.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap()))
|
||||
.unwrap(),
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
)
|
||||
.await;
|
||||
temp_dir.delete();
|
||||
}
|
||||
|
||||
async fn test_blob(store: Store) {
|
||||
// Store and fetch
|
||||
let kind = BlobKind::LinkedMaildir {
|
||||
account_id: 0,
|
||||
document_id: 0,
|
||||
};
|
||||
store.put_blob(&kind, DATA).await.unwrap();
|
||||
assert_eq!(
|
||||
String::from_utf8(store.get_blob(&kind, 0..u32::MAX).await.unwrap().unwrap()).unwrap(),
|
||||
std::str::from_utf8(DATA).unwrap()
|
||||
);
|
||||
assert_eq!(
|
||||
String::from_utf8(store.get_blob(&kind, 11..57).await.unwrap().unwrap()).unwrap(),
|
||||
std::str::from_utf8(&DATA[11..57]).unwrap()
|
||||
);
|
||||
assert!(store.delete_blob(&kind).await.unwrap());
|
||||
assert!(store.get_blob(&kind, 0..u32::MAX).await.unwrap().is_none());
|
||||
|
||||
// Copy
|
||||
let src_kind = BlobKind::LinkedMaildir {
|
||||
account_id: 0,
|
||||
document_id: 1,
|
||||
};
|
||||
store.put_blob(&src_kind, DATA).await.unwrap();
|
||||
for id in 0..4 {
|
||||
let dest_kind = BlobKind::LinkedMaildir {
|
||||
account_id: 1,
|
||||
document_id: id,
|
||||
};
|
||||
assert!(store.copy_blob(&src_kind, &dest_kind, None).await.unwrap());
|
||||
|
||||
assert_eq!(
|
||||
String::from_utf8(
|
||||
store
|
||||
.get_blob(&dest_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
)
|
||||
.unwrap(),
|
||||
std::str::from_utf8(DATA).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
// Copy partial
|
||||
let tmp_kind = BlobKind::Temporary {
|
||||
account_id: 1,
|
||||
creation_year: 2020,
|
||||
creation_month: 12,
|
||||
creation_day: 31,
|
||||
seq: 0,
|
||||
};
|
||||
let tmp_kind2 = BlobKind::Temporary {
|
||||
account_id: 1,
|
||||
creation_year: 2021,
|
||||
creation_month: 1,
|
||||
creation_day: 1,
|
||||
seq: 0,
|
||||
};
|
||||
assert!(store
|
||||
.copy_blob(&src_kind, &tmp_kind, (0..11).into())
|
||||
.await
|
||||
.unwrap());
|
||||
assert!(store
|
||||
.copy_blob(&src_kind, &tmp_kind2, (0..11).into())
|
||||
.await
|
||||
.unwrap());
|
||||
assert_eq!(
|
||||
String::from_utf8(
|
||||
store
|
||||
.get_blob(&tmp_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
)
|
||||
.unwrap(),
|
||||
std::str::from_utf8(&DATA[0..11]).unwrap()
|
||||
);
|
||||
|
||||
// Delete range
|
||||
store
|
||||
.bulk_delete_blob(&BlobKind::LinkedMaildir {
|
||||
account_id: 1,
|
||||
document_id: 0,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
store.bulk_delete_blob(&tmp_kind).await.unwrap();
|
||||
|
||||
// Make sure the blobs are deleted
|
||||
for id in 0..4 {
|
||||
assert!(store
|
||||
.get_blob(
|
||||
&BlobKind::LinkedMaildir {
|
||||
account_id: 1,
|
||||
document_id: id,
|
||||
},
|
||||
0..u32::MAX
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
assert!(store
|
||||
.get_blob(&tmp_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
|
||||
// Make sure other blobs were not deleted
|
||||
assert!(store
|
||||
.get_blob(&src_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
assert!(store
|
||||
.get_blob(&tmp_kind2, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_some());
|
||||
|
||||
// Copying a non-existing blob should fail
|
||||
assert!(!store.copy_blob(&tmp_kind, &src_kind, None).await.unwrap());
|
||||
|
||||
// Copy blob between buckets
|
||||
assert!(store
|
||||
.copy_blob(&src_kind, &tmp_kind, (10..20).into())
|
||||
.await
|
||||
.unwrap());
|
||||
assert_eq!(
|
||||
String::from_utf8(
|
||||
store
|
||||
.get_blob(&tmp_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
)
|
||||
.unwrap(),
|
||||
std::str::from_utf8(&DATA[10..20]).unwrap()
|
||||
);
|
||||
|
||||
// Delete blobs
|
||||
for blob_kind in [src_kind, tmp_kind, tmp_kind2] {
|
||||
assert!(store.delete_blob(&blob_kind).await.unwrap());
|
||||
assert!(store
|
||||
.get_blob(&blob_kind, 0..u32::MAX)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
}
|
||||
}
|
||||
|
|
@ -22,6 +22,7 @@
|
|||
*/
|
||||
|
||||
pub mod assign_id;
|
||||
pub mod blob;
|
||||
pub mod query;
|
||||
|
||||
use std::{io::Read, sync::Arc};
|
||||
|
|
@ -39,7 +40,8 @@ pub async fn store_tests() {
|
|||
let temp_dir = TempDir::new("store_tests", insert);
|
||||
let config_file = format!(
|
||||
concat!(
|
||||
"store.blob.path = \"{}\"\n",
|
||||
"store.blob.type = \"local\"\n",
|
||||
"store.blob.local.path = \"{}\"\n",
|
||||
"store.db.path = \"{}/sqlite.db\"\n"
|
||||
),
|
||||
temp_dir.path.display(),
|
||||
|
|
@ -53,7 +55,7 @@ pub async fn store_tests() {
|
|||
if insert {
|
||||
db.destroy().await;
|
||||
}
|
||||
//assign_id::test(db).await;
|
||||
assign_id::test(db.clone()).await;
|
||||
query::test(db, insert).await;
|
||||
temp_dir.delete();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue