diff --git a/crates/jmap/src/lib.rs b/crates/jmap/src/lib.rs index 0808467f..f8fe0b21 100644 --- a/crates/jmap/src/lib.rs +++ b/crates/jmap/src/lib.rs @@ -84,7 +84,7 @@ pub const LONG_SLUMBER: Duration = Duration::from_secs(60 * 60 * 24); pub struct JMAP { pub store: Store, - pub blob_store: Arc, + pub blob_store: BlobStore, pub config: Config, pub directory: Arc, @@ -202,11 +202,11 @@ impl JMAP { .await .failed("Unable to open database"), )), - blob_store: Arc::new( + blob_store: BlobStore::Fs(Arc::new( FsStore::open(config) .await .failed("Unable to open blob store"), - ), + )), config: Config::new(config).failed("Invalid configuration file"), sessions: TtlDashMap::with_capacity( config.property("jmap.session.cache.size")?.unwrap_or(100), diff --git a/crates/store/src/backend/foundationdb/id_assign.rs b/crates/store/src/backend/foundationdb/id_assign.rs index 8f8961ae..9a12886f 100644 --- a/crates/store/src/backend/foundationdb/id_assign.rs +++ b/crates/store/src/backend/foundationdb/id_assign.rs @@ -35,7 +35,7 @@ use crate::{ use super::{ bitmap::{next_available_index, BITS_PER_BLOCK}, - write::{ID_ASSIGNMENT_EXPIRY, MAX_COMMIT_TIME}, + write::MAX_COMMIT_TIME, FdbStore, }; @@ -85,8 +85,9 @@ impl FdbStore { #[cfg(not(feature = "test_mode"))] let expired_timestamp = now() - ID_ASSIGNMENT_EXPIRY; #[cfg(feature = "test_mode")] - let expired_timestamp = - now() - ID_ASSIGNMENT_EXPIRY.load(std::sync::atomic::Ordering::Relaxed); + let expired_timestamp = now() + - crate::backend::ID_ASSIGNMENT_EXPIRY + .load(std::sync::atomic::Ordering::Relaxed); while let Some(values) = values.next().await { for value in values? { let key = value.key(); diff --git a/crates/store/src/backend/foundationdb/write.rs b/crates/store/src/backend/foundationdb/write.rs index 92c1a447..e72cc03e 100644 --- a/crates/store/src/backend/foundationdb/write.rs +++ b/crates/store/src/backend/foundationdb/write.rs @@ -40,9 +40,6 @@ pub const MAX_COMMIT_ATTEMPTS: u32 = 10; #[cfg(not(feature = "test_mode"))] pub const MAX_COMMIT_TIME: Duration = Duration::from_secs(10); -#[cfg(feature = "test_mode")] -pub static ID_ASSIGNMENT_EXPIRY: std::sync::atomic::AtomicU64 = - std::sync::atomic::AtomicU64::new(60 * 60); // seconds #[cfg(feature = "test_mode")] pub const MAX_COMMIT_ATTEMPTS: u32 = 1000; #[cfg(feature = "test_mode")] diff --git a/crates/store/src/backend/fs/mod.rs b/crates/store/src/backend/fs/mod.rs index 61e671b6..5c633a0c 100644 --- a/crates/store/src/backend/fs/mod.rs +++ b/crates/store/src/backend/fs/mod.rs @@ -29,8 +29,6 @@ use tokio::{ }; use utils::{codec::base32_custom::Base32Writer, config::Config}; -use crate::BlobStore; - pub struct FsStore { path: PathBuf, hash_levels: usize, @@ -56,9 +54,12 @@ impl FsStore { } } -#[async_trait::async_trait] -impl BlobStore for FsStore { - async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { +impl FsStore { + pub(crate) async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> crate::Result>> { let blob_path = self.build_path(key); let blob_size = match fs::metadata(&blob_path).await { Ok(m) => m.len(), @@ -87,7 +88,7 @@ impl BlobStore for FsStore { })) } - async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { + pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { let blob_path = self.build_path(key); if fs::metadata(&blob_path) @@ -103,9 +104,9 @@ impl BlobStore for FsStore { Ok(()) } - async fn delete_blob(&self, key: &[u8]) -> crate::Result { + pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result { let blob_path = self.build_path(key); - if blob_path.exists() { + if fs::metadata(&blob_path).await.is_ok() { fs::remove_file(&blob_path).await?; Ok(true) } else { diff --git a/crates/store/src/backend/mod.rs b/crates/store/src/backend/mod.rs index bac9a2b8..050c5751 100644 --- a/crates/store/src/backend/mod.rs +++ b/crates/store/src/backend/mod.rs @@ -33,6 +33,10 @@ pub mod sqlite; pub(crate) const MAX_TOKEN_LENGTH: usize = (u8::MAX >> 2) as usize; pub(crate) const MAX_TOKEN_MASK: usize = MAX_TOKEN_LENGTH - 1; +#[cfg(feature = "test_mode")] +pub static ID_ASSIGNMENT_EXPIRY: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(60 * 60); // seconds + impl From for crate::Error { fn from(err: std::io::Error) -> Self { Self::InternalError(format!("IO error: {}", err)) diff --git a/crates/store/src/backend/s3/mod.rs b/crates/store/src/backend/s3/mod.rs index a1151c4a..9aa3bba8 100644 --- a/crates/store/src/backend/s3/mod.rs +++ b/crates/store/src/backend/s3/mod.rs @@ -30,8 +30,6 @@ use s3::{ }; use utils::{codec::base32_custom::Base32Writer, config::Config}; -use crate::BlobStore; - pub struct S3Store { bucket: Bucket, } @@ -67,11 +65,12 @@ impl S3Store { .with_request_timeout(timeout), }) } -} -#[async_trait::async_trait] -impl BlobStore for S3Store { - async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { + pub(crate) async fn get_blob( + &self, + key: &[u8], + range: Range, + ) -> crate::Result>> { let path = Base32Writer::from_bytes(key).finalize(); let response = if range.start != 0 || range.end != u32::MAX { self.bucket @@ -98,7 +97,7 @@ impl BlobStore for S3Store { } } - async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { + pub(crate) async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { match self .bucket .put_object(Base32Writer::from_bytes(key).finalize(), data) @@ -114,7 +113,7 @@ impl BlobStore for S3Store { } } - async fn delete_blob(&self, key: &[u8]) -> crate::Result { + pub(crate) async fn delete_blob(&self, key: &[u8]) -> crate::Result { self.bucket .delete_object(Base32Writer::from_bytes(key).finalize()) .await diff --git a/crates/store/src/dispatch.rs b/crates/store/src/dispatch.rs index c3fe937a..e7e59761 100644 --- a/crates/store/src/dispatch.rs +++ b/crates/store/src/dispatch.rs @@ -21,14 +21,14 @@ * for more details. */ -use std::ops::BitAndAssign; +use std::ops::{BitAndAssign, Range}; use roaring::RoaringBitmap; use crate::{ query, write::{Batch, BitmapClass, ValueClass}, - BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, + BitmapKey, BlobStore, Deserialize, IterateParams, Key, Store, ValueKey, }; impl Store { @@ -252,7 +252,7 @@ impl Store { } #[cfg(feature = "test_mode")] - pub async fn assert_is_empty(&self, blob_store: std::sync::Arc) { + pub async fn assert_is_empty(&self, blob_store: crate::BlobStore) { self.blob_hash_expire_all().await; self.blob_hash_purge(blob_store).await.unwrap(); self.purge_bitmaps().await.unwrap(); @@ -263,3 +263,26 @@ impl Store { } } } + +impl BlobStore { + pub async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>> { + match self { + Self::Fs(store) => store.get_blob(key, range).await, + Self::S3(store) => store.get_blob(key, range).await, + } + } + + pub async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()> { + match self { + Self::Fs(store) => store.put_blob(key, data).await, + Self::S3(store) => store.put_blob(key, data).await, + } + } + + pub async fn delete_blob(&self, key: &[u8]) -> crate::Result { + match self { + Self::Fs(store) => store.delete_blob(key).await, + Self::S3(store) => store.delete_blob(key).await, + } + } +} diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index bb66b46c..5dd85549 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -21,7 +21,7 @@ * for more details. */ -use std::{fmt::Display, ops::Range, sync::Arc}; +use std::{fmt::Display, sync::Arc}; pub mod backend; //pub mod fts; @@ -30,7 +30,7 @@ pub mod query; pub mod write; pub use ahash; -use backend::{foundationdb::FdbStore, sqlite::SqliteStore}; +use backend::{foundationdb::FdbStore, fs::FsStore, s3::S3Store, sqlite::SqliteStore}; pub use blake3; pub use parking_lot; pub use rand; @@ -169,14 +169,38 @@ pub struct IterateParams { values: bool, } -#[async_trait::async_trait] -pub trait BlobStore: Sync + Send { - async fn get_blob(&self, key: &[u8], range: Range) -> crate::Result>>; - async fn put_blob(&self, key: &[u8], data: &[u8]) -> crate::Result<()>; - async fn delete_blob(&self, key: &[u8]) -> crate::Result; -} - +#[derive(Clone)] pub enum Store { SQLite(Arc), FoundationDb(Arc), } + +#[derive(Clone)] +pub enum BlobStore { + Fs(Arc), + S3(Arc), +} + +impl From for Store { + fn from(store: SqliteStore) -> Self { + Self::SQLite(Arc::new(store)) + } +} + +impl From for Store { + fn from(store: FdbStore) -> Self { + Self::FoundationDb(Arc::new(store)) + } +} + +impl From for BlobStore { + fn from(store: FsStore) -> Self { + Self::Fs(Arc::new(store)) + } +} + +impl From for BlobStore { + fn from(store: S3Store) -> Self { + Self::S3(Arc::new(store)) + } +} diff --git a/crates/store/src/write/blob.rs b/crates/store/src/write/blob.rs index bc0e9e0e..d69b7ae8 100644 --- a/crates/store/src/write/blob.rs +++ b/crates/store/src/write/blob.rs @@ -21,8 +21,6 @@ * for more details. */ -use std::sync::Arc; - use ahash::AHashSet; use crate::{ @@ -32,7 +30,7 @@ use crate::{ use super::{key::DeserializeBigEndian, now, BlobOp}; -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] pub struct BlobQuota { pub bytes: usize, pub count: usize, @@ -177,7 +175,7 @@ impl Store { Ok(has_access) } - pub async fn blob_hash_purge(&self, blob_store: Arc) -> crate::Result<()> { + pub async fn blob_hash_purge(&self, blob_store: BlobStore) -> crate::Result<()> { // Remove expired temporary blobs let from_key = BlobKey { account_id: 0, diff --git a/tests/src/store/assign_id.rs b/tests/src/store/assign_id.rs index eed81298..15382a17 100644 --- a/tests/src/store/assign_id.rs +++ b/tests/src/store/assign_id.rs @@ -21,28 +21,27 @@ * for more details. */ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, time::Duration}; use store::ahash::AHashSet; +use store::backend::ID_ASSIGNMENT_EXPIRY; use store::{write::BatchBuilder, Store}; -pub async fn test(db: Arc) { +pub async fn test(db: Store) { println!("Running Store ID assignment tests..."); - store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY - .store(2, std::sync::atomic::Ordering::Relaxed); + ID_ASSIGNMENT_EXPIRY.store(2, std::sync::atomic::Ordering::Relaxed); test_1(db.clone()).await; test_2(db.clone()).await; test_3(db.clone()).await; test_4(db).await; - store::backend::foundationdb::write::ID_ASSIGNMENT_EXPIRY - .store(60 * 60, std::sync::atomic::Ordering::Relaxed); + ID_ASSIGNMENT_EXPIRY.store(60 * 60, std::sync::atomic::Ordering::Relaxed); } -async fn test_1(db: Arc) { +async fn test_1(db: Store) { // Test change id assignment let mut handles = Vec::new(); let mut expected_ids = HashSet::new(); @@ -66,7 +65,7 @@ async fn test_1(db: Arc) { db.destroy().await; } -async fn test_2(db: Arc) { +async fn test_2(db: Store) { // Test document id assignment for wait_for_expiry in [true, false] { let mut handles = Vec::new(); @@ -102,7 +101,7 @@ async fn test_2(db: Arc) { db.destroy().await; } -async fn test_3(db: Arc) { +async fn test_3(db: Store) { // Create document ids and try reassigning let mut expected_ids = AHashSet::new(); let mut batch = BatchBuilder::new(); @@ -133,7 +132,7 @@ async fn test_3(db: Arc) { db.destroy().await; } -async fn test_4(db: Arc) { +async fn test_4(db: Store) { // Try reassigning deleted ids let mut expected_ids = AHashSet::new(); let mut batch = BatchBuilder::new(); diff --git a/tests/src/store/blob.rs b/tests/src/store/blob.rs index ed59ca55..2526f358 100644 --- a/tests/src/store/blob.rs +++ b/tests/src/store/blob.rs @@ -21,222 +21,383 @@ * for more details. */ -use store::{write::now, BlobHash, Store}; +use store::{ + backend::{fs::FsStore, s3::S3Store, sqlite::SqliteStore}, + write::{blob::BlobQuota, now, BatchBuilder, BlobOp, F_CLEAR}, + BlobClass, BlobHash, BlobStore, Store, +}; use utils::config::Config; use crate::store::TempDir; const CONFIG_S3: &str = r#" -[store.db] -path = "{TMP}/_blob_s3_test_delete.db?mode=rwc" - -[store.blob] -type = "s3" - [store.blob.s3] access-key = "minioadmin" secret-key = "minioadmin" region = "eu-central-1" endpoint = "http://localhost:9000" bucket = "tmp" - "#; const CONFIG_LOCAL: &str = r#" -[store.db] -path = "{TMP}/_blob_s3_test_delete.db?mode=rwc" - -[store.blob] -type = "local" - [store.blob.local] path = "{TMP}" - "#; -const DATA: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce erat nisl, dignissim a porttitor id, varius nec arcu. Sed mauris."; +const CONFIG_DB: &str = r#" +[store.db] +path = "{TMP}/db.db?mode=rwc" +"#; #[tokio::test] pub async fn blob_tests() { let temp_dir = TempDir::new("blob_tests", true); - /* test_blob( - Store::open( - &Config::new(&CONFIG_LOCAL.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())) - .unwrap(), - ) - .await - .unwrap(), - ) - .await; - test_blob( - Store::open( - &Config::new(&CONFIG_S3.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())) - .unwrap(), - ) - .await - .unwrap(), - ) - .await;*/ - temp_dir.delete(); -} + let mut blob_store = None; -/* -async fn test_blob(store: impl Store) { - // Obtain temp quota - let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 100).await.unwrap(); - assert_eq!(quota_items, 0); - assert_eq!(quota_bytes, 0); - store.purge_tmp_blobs(0).await.unwrap(); + for (store_id, store_cfg) in [("s3", CONFIG_S3), ("fs", CONFIG_LOCAL)] { + let config = + Config::new(&store_cfg.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())) + .unwrap(); - // Store and fetch - let kind = BlobHash::LinkedMaildir { - account_id: 0, - document_id: 0, - }; - store.put_blob(&kind, DATA).await.unwrap(); - assert_eq!( - String::from_utf8(store.get_blob(&kind, 0..u32::MAX).await.unwrap().unwrap()).unwrap(), - std::str::from_utf8(DATA).unwrap() - ); - assert_eq!( - String::from_utf8(store.get_blob(&kind, 11..57).await.unwrap().unwrap()).unwrap(), - std::str::from_utf8(&DATA[11..57]).unwrap() - ); - assert!(store.delete_blob(&kind).await.unwrap()); - assert!(store.get_blob(&kind, 0..u32::MAX).await.unwrap().is_none()); - - // Copy - let src_kind = BlobHash::LinkedMaildir { - account_id: 0, - document_id: 1, - }; - store.put_blob(&src_kind, DATA).await.unwrap(); - for id in 0..4 { - let dest_kind = BlobHash::LinkedMaildir { - account_id: 1, - document_id: id, + let blob_store_: BlobStore = match store_id { + "fs" => FsStore::open(&config).await.unwrap().into(), + "s3" => S3Store::open(&config).await.unwrap().into(), + _ => unreachable!(), }; - assert!(store.copy_blob(&src_kind, &dest_kind, None).await.unwrap()); - assert_eq!( - String::from_utf8( - store - .get_blob(&dest_kind, 0..u32::MAX) - .await - .unwrap() - .unwrap() - ) + println!("Testing store {}...", store_id); + test_store(blob_store_.clone()).await; + blob_store = Some(blob_store_); + } + let blob_store = blob_store.unwrap(); + + // Start SQLite store + let store: Store = SqliteStore::open( + &Config::new(&CONFIG_DB.replace("{TMP}", temp_dir.path.as_path().to_str().unwrap())) .unwrap(), - std::str::from_utf8(DATA).unwrap() + ) + .await + .unwrap() + .into(); + + // Blob hash exists + let hash = BlobHash::from(b"abc".as_slice()); + assert!(!store.blob_hash_exists(&hash).await.unwrap()); + + // Reserve blob but mark it as expired + store + .write( + BatchBuilder::new() + .with_account_id(0) + .blob( + hash.clone(), + BlobOp::Reserve { + until: now() - 10, + size: 1024, + }, + 0, + ) + .build_batch(), + ) + .await + .unwrap(); + + // Uncommitted blob, should not exist + assert!(!store.blob_hash_exists(&hash).await.unwrap()); + + // Write blob to store + blob_store.put_blob(hash.as_ref(), b"abc").await.unwrap(); + + // Commit blob + store + .write( + BatchBuilder::new() + .blob(hash.clone(), BlobOp::Commit, 0) + .build_batch(), + ) + .await + .unwrap(); + + // Blob hash should now exist + assert!(store.blob_hash_exists(&hash).await.unwrap()); + + // AccountId 0 should be able to read blob + assert!(store + .blob_hash_can_read(&hash, BlobClass::Reserved { account_id: 0 }) + .await + .unwrap()); + + // AccountId 1 should not be able to read blob + assert!(!store + .blob_hash_can_read(&hash, BlobClass::Reserved { account_id: 1 }) + .await + .unwrap()); + + // Blob already expired, quota should be 0 + assert_eq!( + store.blob_hash_quota(0).await.unwrap(), + BlobQuota { bytes: 0, count: 0 } + ); + + // Purge expired blobs + store.blob_hash_purge(blob_store.clone()).await.unwrap(); + + // Blob hash should no longer exist + assert!(!store.blob_hash_exists(&hash).await.unwrap()); + + // AccountId 0 should not be able to read blob + assert!(!store + .blob_hash_can_read(&hash, BlobClass::Reserved { account_id: 0 }) + .await + .unwrap()); + + // Blob should no longer be in store + assert!(blob_store + .get_blob(hash.as_ref(), 0..u32::MAX) + .await + .unwrap() + .is_none()); + + // Upload one linked blob to accountId 1, two linked blobs to accountId 0, and three unlinked (reserved) blobs to accountId 2 + for (document_id, (blob, blob_op)) in [ + (b"123", BlobOp::Link), + (b"456", BlobOp::Link), + (b"789", BlobOp::Link), + ( + b"abc", + BlobOp::Reserve { + until: now() - 10, + size: 5000, + }, + ), + ( + b"efg", + BlobOp::Reserve { + until: now() + 10, + size: 1000, + }, + ), + ( + b"hij", + BlobOp::Reserve { + until: now() + 10, + size: 2000, + }, + ), + ] + .into_iter() + .enumerate() + { + let hash = BlobHash::from(blob.as_slice()); + store + .write( + BatchBuilder::new() + .with_account_id(if document_id > 0 { 0 } else { 1 }) + .with_collection(0) + .update_document(document_id as u32) + .blob(hash.clone(), blob_op, 0) + .blob(hash.clone(), BlobOp::Commit, 0) + .build_batch(), + ) + .await + .unwrap(); + blob_store + .put_blob(hash.as_ref(), blob.as_slice()) + .await + .unwrap(); + } + + // One of the reserved blobs expired and should not count towards quota + assert_eq!( + store.blob_hash_quota(0).await.unwrap(), + BlobQuota { + bytes: 3000, + count: 2 + } + ); + assert_eq!( + store.blob_hash_quota(1).await.unwrap(), + BlobQuota { bytes: 0, count: 0 } + ); + + // Purge expired blobs and make sure nothing else is deleted + store.blob_hash_purge(blob_store.clone()).await.unwrap(); + for (pos, (blob, blob_class)) in [ + (b"abc", BlobClass::Reserved { account_id: 0 }), + ( + b"123", + BlobClass::Linked { + account_id: 1, + collection: 0, + document_id: 0, + }, + ), + ( + b"456", + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 1, + }, + ), + ( + b"789", + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 2, + }, + ), + (b"efg", BlobClass::Reserved { account_id: 0 }), + (b"hij", BlobClass::Reserved { account_id: 0 }), + ] + .into_iter() + .enumerate() + { + let ct = pos == 0; + let hash = BlobHash::from(blob.as_slice()); + assert!(store.blob_hash_can_read(&hash, blob_class).await.unwrap() ^ ct); + assert!(store.blob_hash_exists(&hash).await.unwrap() ^ ct); + assert!( + blob_store + .get_blob(hash.as_ref(), 0..u32::MAX) + .await + .unwrap() + .is_some() + ^ ct ); } - // Copy partial - let now = now(); - let mut tmp_kinds = Vec::new(); - for i in 1..=3 { - let tmp_kind = BlobHash::Temporary { - account_id: 2, - timestamp: now - (i * 5), - seq: 0, - }; - assert!(store - .copy_blob(&src_kind, &tmp_kind, (0..11).into()) - .await - .unwrap()); - tmp_kinds.push(tmp_kind); - } - - assert_eq!( - String::from_utf8( - store - .get_blob(&tmp_kinds[0], 0..u32::MAX) - .await - .unwrap() - .unwrap() - ) - .unwrap(), - std::str::from_utf8(&DATA[0..11]).unwrap() - ); - - // Obtain temp quota - let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 100).await.unwrap(); - assert_eq!(quota_items, 3); - assert_eq!(quota_bytes, 33); - let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 12).await.unwrap(); - assert_eq!(quota_items, 2); - assert_eq!(quota_bytes, 22); - - // Delete range - store.delete_account_blobs(1).await.unwrap(); - store.purge_tmp_blobs(7).await.unwrap(); - - // Make sure the blobs are deleted - for id in 0..4 { - assert!(store - .get_blob( - &BlobHash::LinkedMaildir { - account_id: 1, - document_id: id, - }, - 0..u32::MAX - ) - .await - .unwrap() - .is_none()); - } - for i in [1, 2] { - assert!(store - .get_blob(&tmp_kinds[i], 0..u32::MAX) - .await - .unwrap() - .is_none()); - } - - // Make sure other blobs were not deleted - assert!(store - .get_blob(&src_kind, 0..u32::MAX) - .await - .unwrap() - .is_some()); - assert!(store - .get_blob(&tmp_kinds[0], 0..u32::MAX) - .await - .unwrap() - .is_some()); - - // Copying a non-existing blob should fail + // AccountId 0 should not have access to accountId 1's blobs assert!(!store - .copy_blob(&tmp_kinds[1], &src_kind, None) + .blob_hash_can_read( + BlobHash::from(b"123".as_slice()), + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 0, + } + ) .await .unwrap()); - // Copy blob between buckets - assert!(store - .copy_blob(&src_kind, &tmp_kinds[0], (10..20).into()) + // Unlink blob + store + .write( + BatchBuilder::new() + .with_account_id(0) + .with_collection(0) + .update_document(2) + .blob(BlobHash::from(b"789".as_slice()), BlobOp::Link, F_CLEAR) + .build_batch(), + ) .await - .unwrap()); - assert_eq!( - String::from_utf8( - store - .get_blob(&tmp_kinds[0], 0..u32::MAX) + .unwrap(); + + // Purge and make sure blob is deleted + store.blob_hash_purge(blob_store.clone()).await.unwrap(); + for (pos, (blob, blob_class)) in [ + ( + b"789", + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 2, + }, + ), + ( + b"123", + BlobClass::Linked { + account_id: 1, + collection: 0, + document_id: 0, + }, + ), + ( + b"456", + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 1, + }, + ), + (b"efg", BlobClass::Reserved { account_id: 0 }), + (b"hij", BlobClass::Reserved { account_id: 0 }), + ] + .into_iter() + .enumerate() + { + let ct = pos == 0; + let hash = BlobHash::from(blob.as_slice()); + assert!(store.blob_hash_can_read(&hash, blob_class).await.unwrap() ^ ct); + assert!(store.blob_hash_exists(&hash).await.unwrap() ^ ct); + assert!( + blob_store + .get_blob(hash.as_ref(), 0..u32::MAX) .await .unwrap() - .unwrap() - ) - .unwrap(), - std::str::from_utf8(&DATA[10..20]).unwrap() - ); - - // Delete blobs - for blob_kind in [src_kind, tmp_kinds[0]] { - assert!(store.delete_blob(&blob_kind).await.unwrap()); - assert!(store - .get_blob(&blob_kind, 0..u32::MAX) - .await - .unwrap() - .is_none()); + .is_some() + ^ ct + ); } + + // Unlink all blobs from accountId 1 and purge + store.blob_hash_unlink_account(1).await.unwrap(); + store.blob_hash_purge(blob_store.clone()).await.unwrap(); + + // Make sure only accountId 0's blobs are left + for (pos, (blob, blob_class)) in [ + ( + b"123", + BlobClass::Linked { + account_id: 1, + collection: 0, + document_id: 0, + }, + ), + ( + b"456", + BlobClass::Linked { + account_id: 0, + collection: 0, + document_id: 1, + }, + ), + (b"efg", BlobClass::Reserved { account_id: 0 }), + (b"hij", BlobClass::Reserved { account_id: 0 }), + ] + .into_iter() + .enumerate() + { + let ct = pos == 0; + let hash = BlobHash::from(blob.as_slice()); + assert!(store.blob_hash_can_read(&hash, blob_class).await.unwrap() ^ ct); + assert!(store.blob_hash_exists(&hash).await.unwrap() ^ ct); + assert!( + blob_store + .get_blob(hash.as_ref(), 0..u32::MAX) + .await + .unwrap() + .is_some() + ^ ct + ); + } + + temp_dir.delete(); } -*/ +async fn test_store(store: BlobStore) { + const DATA: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce erat nisl, dignissim a porttitor id, varius nec arcu. Sed mauris."; + + store.put_blob(b"abc", DATA).await.unwrap(); + assert_eq!( + String::from_utf8(store.get_blob(b"abc", 0..u32::MAX).await.unwrap().unwrap()).unwrap(), + std::str::from_utf8(DATA).unwrap() + ); + assert_eq!( + String::from_utf8(store.get_blob(b"abc", 11..57).await.unwrap().unwrap()).unwrap(), + std::str::from_utf8(&DATA[11..57]).unwrap() + ); + assert!(store.delete_blob(b"abc").await.unwrap()); + assert!(store.get_blob(b"abc", 0..u32::MAX).await.unwrap().is_none()); +} diff --git a/tests/src/store/mod.rs b/tests/src/store/mod.rs index 1243cc63..bcf10172 100644 --- a/tests/src/store/mod.rs +++ b/tests/src/store/mod.rs @@ -21,7 +21,6 @@ * for more details. */ -#[cfg(feature = "foundationdb")] pub mod assign_id; pub mod blob; pub mod query; @@ -30,6 +29,7 @@ use std::{io::Read, sync::Arc}; use ::store::Store; +use store::backend::sqlite::SqliteStore; use utils::config::Config; pub struct TempDir { @@ -38,7 +38,7 @@ pub struct TempDir { #[tokio::test] pub async fn store_tests() { - /*let insert = true; + let insert = true; let temp_dir = TempDir::new("store_tests", insert); let config_file = format!( concat!( @@ -49,18 +49,16 @@ pub async fn store_tests() { temp_dir.path.display(), temp_dir.path.display() ); - let db = Arc::new( - Store::open(&Config::new(&config_file).unwrap()) - .await - .unwrap(), - ); + let db: Store = SqliteStore::open(&Config::new(&config_file).unwrap()) + .await + .unwrap() + .into(); if insert { db.destroy().await; } - #[cfg(feature = "foundationdb")] assign_id::test(db.clone()).await; query::test(db, insert).await; - temp_dir.delete();*/ + temp_dir.delete(); } pub fn deflate_artwork_data() -> Vec { diff --git a/tests/src/store/query.rs b/tests/src/store/query.rs index 6e0c3d32..0dd352da 100644 --- a/tests/src/store/query.rs +++ b/tests/src/store/query.rs @@ -94,7 +94,7 @@ const FIELDS_OPTIONS: [FieldType; 20] = [ ]; #[allow(clippy::mutex_atomic)] -pub async fn test(db: Arc, do_insert: bool) { +pub async fn test(db: Store, do_insert: bool) { println!("Running Store query tests..."); let pool = rayon::ThreadPoolBuilder::new() @@ -215,7 +215,7 @@ pub async fn test(db: Arc, do_insert: bool) { test_sort(db).await; } -pub async fn test_filter(db: Arc) { +pub async fn test_filter(db: Store) { /* let mut fields = AHashMap::default(); for (field_num, field) in FIELDS.iter().enumerate() { @@ -361,7 +361,7 @@ pub async fn test_filter(db: Arc) { */ } -pub async fn test_sort(db: Arc) { +pub async fn test_sort(db: Store) { let mut fields = AHashMap::default(); for (field_num, field) in FIELDS.iter().enumerate() { fields.insert(field.to_string(), field_num as u8);