Disk quotas support.

This commit is contained in:
Mauro D 2023-06-08 12:53:29 +00:00
parent ab895b2fae
commit 891d39940d
44 changed files with 1169 additions and 532 deletions

332
Cargo.lock generated
View file

@ -84,9 +84,9 @@ dependencies = [
[[package]]
name = "aho-corasick"
version = "1.0.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04"
checksum = "43f6cb1bf222025340178f382c426f13757b2960e89779dfcb319c32542a5a41"
dependencies = [
"memchr",
]
@ -243,17 +243,6 @@ dependencies = [
"webpki-roots 0.22.6",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi 0.1.19",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -383,29 +372,6 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.60.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "062dddbc1ba4aca46de6338e2bf87771414c335f7b2f2036e8f3e9befebf88e6"
dependencies = [
"bitflags 1.3.2",
"cexpr",
"clang-sys",
"clap",
"env_logger",
"lazy_static",
"lazycell",
"log",
"peeking_take_while",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"which",
]
[[package]]
name = "bindgen"
version = "0.64.0"
@ -426,6 +392,29 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "bindgen"
version = "0.65.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfdf7b466f9a4903edc73f95d6d2bcd5baf8ae620638762244d3f60143643cc5"
dependencies = [
"bitflags 1.3.2",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"log",
"peeking_take_while",
"prettyplease 0.2.6",
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"shlex",
"syn 2.0.18",
"which",
]
[[package]]
name = "bitflags"
version = "1.3.2"
@ -623,30 +612,6 @@ dependencies = [
"libloading",
]
[[package]]
name = "clap"
version = "3.2.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ea181bf566f71cb9a5d17a59e1871af638180a18fb0035c92ae62b705207123"
dependencies = [
"atty",
"bitflags 1.3.2",
"clap_lex",
"indexmap",
"strsim",
"termcolor",
"textwrap",
]
[[package]]
name = "clap_lex"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2850f2f5a82cbf437dd5af4d49848fbdfc27c157c3d010345776f952765261c5"
dependencies = [
"os_str_bytes",
]
[[package]]
name = "const-oid"
version = "0.9.2"
@ -1068,19 +1033,6 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "env_logger"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a12e6657c4c97ebab115a42dcee77225f7f482cdd841cf7088c657a42e9e00e7"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "errno"
version = "0.3.1"
@ -1230,18 +1182,18 @@ dependencies = [
[[package]]
name = "form_urlencoded"
version = "1.1.0"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8"
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
dependencies = [
"percent-encoding",
]
[[package]]
name = "foundationdb"
version = "0.7.0"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69adb701525370e5f8958454b46e8459b276d81ce6391edbf84eae32eeddff75"
checksum = "8696fd1be198f101eb58aeecf0f504fc02b28c7afcc008b4e4a998a91b305108"
dependencies = [
"async-recursion",
"async-trait",
@ -1251,36 +1203,41 @@ dependencies = [
"futures",
"memchr",
"rand",
"serde",
"serde_bytes",
"serde_json",
"static_assertions",
"uuid",
]
[[package]]
name = "foundationdb-gen"
version = "0.7.0"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134e1c986a2bb78904f426d4924a55e8c14162ba764e229501eb6f95c8c37489"
checksum = "62239700f01b041b6372aaeb847c52f960e1a69fd2b1025dc995ea3dd90e3308"
dependencies = [
"xml-rs",
]
[[package]]
name = "foundationdb-macros"
version = "0.1.1"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2163c7326208be8edc605e10303ec6ae45cf106c12540754a9970bcce0f80cae"
checksum = "83c8d52fe8b46ab822b4decdcc0d6d85aeedfc98f0d52ba2bd4aec4a97807516"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
"syn 2.0.18",
"try_map",
]
[[package]]
name = "foundationdb-sys"
version = "0.7.0"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3eb26eee771096794dbee1a2a9defa455443a2c150a810386331aa0d6603d356"
checksum = "98e49545f5393d276b7b888c77e3f9519fd33727435f8244344be72c3284256f"
dependencies = [
"bindgen 0.60.1",
"bindgen 0.65.1",
]
[[package]]
@ -1410,14 +1367,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0176e0459c2e4a1fe232f984bca6890e681076abb9934f6cea7c326f3fc47818"
dependencies = [
"libc",
"windows-targets 0.48.0",
"windows-targets",
]
[[package]]
name = "getrandom"
version = "0.2.9"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [
"cfg-if",
"libc",
@ -1506,15 +1463,6 @@ dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33"
dependencies = [
"libc",
]
[[package]]
name = "hermit-abi"
version = "0.2.6"
@ -1641,12 +1589,6 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.26"
@ -1664,7 +1606,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.4.9",
"tokio",
"tower-service",
"tracing",
@ -1720,9 +1662,9 @@ dependencies = [
[[package]]
name = "iana-time-zone"
version = "0.1.56"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0722cd7114b7de04316e7ea5456a0bbb20e4adb46fd27a3697adb812cff0f37c"
checksum = "2fad5b825842d2b38bd206f3e81d6957625fd7f0a361e345c30e01a0ae2dd613"
dependencies = [
"android_system_properties",
"core-foundation-sys",
@ -1754,9 +1696,9 @@ dependencies = [
[[package]]
name = "idna"
version = "0.3.0"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
@ -1803,14 +1745,14 @@ dependencies = [
[[package]]
name = "ipconfig"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd302af1b90f2463a98fa5ad469fc212c8e3175a41c3068601bfa2727591c5be"
checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f"
dependencies = [
"socket2",
"socket2 0.5.3",
"widestring",
"winapi",
"winreg",
"windows-sys 0.48.0",
"winreg 0.50.0",
]
[[package]]
@ -1892,7 +1834,7 @@ dependencies = [
[[package]]
name = "jmap-client"
version = "0.3.0"
source = "git+https://github.com/stalwartlabs/jmap-client#952f272d091f75d115edaf6364a006ac6a56490a"
source = "git+https://github.com/stalwartlabs/jmap-client#5434b2d26e7285a89146b708a710962b096b05bf"
dependencies = [
"ahash 0.8.3",
"async-stream",
@ -1995,9 +1937,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.144"
version = "0.2.146"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b00cc1c228a6782d0f076e7b232802e0c5689d41bb5df366f2a6b6621cfdfe1"
checksum = "f92be4933c13fd498862a9e02a3055f8a8d9c039ce33db97306fd5a6caa7f29b"
[[package]]
name = "libloading"
@ -2067,9 +2009,9 @@ checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519"
[[package]]
name = "lock_api"
version = "0.4.9"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435011366fe56583b16cf956f9df0095b405b82d76425bc8981c0e22e60ec4df"
checksum = "c1cc9717a20b1bb222f333e6a92fd32f7d8a18ddc5a3191a11af45dcbf4dcd16"
dependencies = [
"autocfg",
"scopeguard",
@ -2424,9 +2366,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.17.2"
version = "1.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9670a07f94779e00908f3e686eab508878ebb390ba6e604d3a284c00e8d0487b"
checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d"
[[package]]
name = "opaque-debug"
@ -2592,12 +2534,6 @@ dependencies = [
"hashbrown 0.12.3",
]
[[package]]
name = "os_str_bytes"
version = "6.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267"
[[package]]
name = "overload"
version = "0.1.1"
@ -2628,15 +2564,15 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.9.7"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9069cbb9f99e3a5083476ccb29ceb1de18b9118cafa53e90c9551235de2b9521"
checksum = "93f00c865fe7cabf650081affecd3871070f26767e7b2070a3ffae14c654b447"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.2.16",
"redox_syscall 0.3.5",
"smallvec",
"windows-sys 0.45.0",
"windows-targets",
]
[[package]]
@ -2717,9 +2653,9 @@ dependencies = [
[[package]]
name = "percent-encoding"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
[[package]]
name = "petgraph"
@ -2881,6 +2817,16 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "prettyplease"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b69d39aab54d069e7f2fe8cb970493e7834601ca2d8c65fd7bbd183578080d1"
dependencies = [
"proc-macro2",
"syn 2.0.18",
]
[[package]]
name = "primeorder"
version = "0.13.2"
@ -2932,7 +2878,7 @@ dependencies = [
"log",
"multimap",
"petgraph",
"prettyplease",
"prettyplease 0.1.25",
"prost",
"prost-types",
"regex",
@ -3106,9 +3052,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.3"
version = "1.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81ca098a9821bd52d6b24fd8b10bd081f47d39c22778cafaa75a2857a62c6390"
checksum = "d0ab3ca65655bb1e41f2a8c8cd662eb4fb035e67c3f78da1d61dffe89d07300f"
dependencies = [
"aho-corasick",
"memchr",
@ -3175,7 +3121,7 @@ dependencies = [
"wasm-streams",
"web-sys",
"webpki-roots 0.22.6",
"winreg",
"winreg 0.10.1",
]
[[package]]
@ -3523,6 +3469,15 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "serde_bytes"
version = "0.11.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "416bda436f9aab92e02c8e10d49a15ddd339cea90b6e340fe51ed97abb548294"
dependencies = [
"serde",
]
[[package]]
name = "serde_derive"
version = "1.0.163"
@ -3754,6 +3709,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877"
dependencies = [
"libc",
"windows-sys 0.48.0",
]
[[package]]
name = "spin"
version = "0.5.2"
@ -4049,12 +4014,6 @@ dependencies = [
"unicode-normalization",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subtle"
version = "2.4.1"
@ -4103,24 +4062,16 @@ dependencies = [
[[package]]
name = "tempfile"
version = "3.5.0"
version = "3.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9fbec84f381d5795b08656e4912bec604d162bff9291d6189a78f4c8ab87998"
checksum = "31c0432476357e58790aaa47a8efb0c5138f137343f3b5f23bd36a27e3b0a6d6"
dependencies = [
"autocfg",
"cfg-if",
"fastrand",
"redox_syscall 0.3.5",
"rustix",
"windows-sys 0.45.0",
]
[[package]]
name = "termcolor"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be55cf8942feac5c765c2c993422806843c9a9a45d4d5c407ad6dd2ea95eb9b6"
dependencies = [
"winapi-util",
"windows-sys 0.48.0",
]
[[package]]
@ -4166,12 +4117,6 @@ dependencies = [
"utils",
]
[[package]]
name = "textwrap"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.40"
@ -4279,7 +4224,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.4.9",
"tokio-macros",
"windows-sys 0.48.0",
]
@ -4404,7 +4349,7 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease",
"prettyplease 0.1.25",
"proc-macro2",
"prost-build",
"quote",
@ -4600,6 +4545,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "try_map"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb1626d07cb5c1bb2cf17d94c0be4852e8a7c02b041acec9a8c5bdda99f9d580"
[[package]]
name = "tungstenite"
version = "0.19.0"
@ -4684,12 +4635,12 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643"
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
dependencies = [
"form_urlencoded",
"idna 0.3.0",
"idna 0.4.0",
"percent-encoding",
]
@ -4913,9 +4864,9 @@ checksum = "2c70234412ca409cc04e864e89523cb0fc37f5e1344ebed5a3ebf4192b6b9f68"
[[package]]
name = "widestring"
version = "0.5.1"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17882f045410753661207383517a6f62ec3dbeb6a4ed2acce01f0728238d1983"
checksum = "653f141f39ec16bba3c5abe400a0c60da7468261cc2cbf36805022876bc721a8"
[[package]]
name = "winapi"
@ -4933,15 +4884,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
@ -4954,7 +4896,7 @@ version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f"
dependencies = [
"windows-targets 0.48.0",
"windows-targets",
]
[[package]]
@ -4972,37 +4914,13 @@ dependencies = [
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9"
dependencies = [
"windows-targets 0.48.0",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
"windows-targets",
]
[[package]]
@ -5113,6 +5031,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "winreg"
version = "0.50.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1"
dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "x509-parser"
version = "0.14.0"

View file

@ -26,11 +26,15 @@ use std::{borrow::Cow, fmt::Display};
#[derive(Debug, Clone, Copy, serde::Serialize)]
pub enum RequestLimitError {
#[serde(rename(serialize = "maxSizeRequest"))]
Size,
SizeRequest,
#[serde(rename(serialize = "maxSizeUpload"))]
SizeUpload,
#[serde(rename(serialize = "maxCallsInRequest"))]
CallsIn,
#[serde(rename(serialize = "maxConcurrentRequests"))]
Concurrent,
ConcurrentRequest,
#[serde(rename(serialize = "maxConcurrentUpload"))]
ConcurrentUpload,
}
#[derive(Debug, serde::Serialize)]
@ -112,6 +116,17 @@ impl RequestError {
)
}
pub fn over_blob_quota(max_files: usize, max_bytes: usize) -> Self {
RequestError::blank(
403,
"Quota exceeded",
format!(
"You have exceeded the blob upload quota of {} files or {} bytes.",
max_files, max_bytes
),
)
}
pub fn too_many_requests() -> Self {
RequestError::blank(
429,
@ -134,18 +149,26 @@ impl RequestError {
status: 400,
title: None,
detail: match limit_type {
RequestLimitError::Size => concat!(
RequestLimitError::SizeRequest => concat!(
"The request is larger than the server ",
"is willing to process."
),
RequestLimitError::SizeUpload => concat!(
"The uploaded file is larger than the server ",
"is willing to process."
),
RequestLimitError::CallsIn => concat!(
"The request exceeds the maximum number ",
"of calls in a single request."
),
RequestLimitError::Concurrent => concat!(
RequestLimitError::ConcurrentRequest => concat!(
"The request exceeds the maximum number ",
"of concurrent requests."
),
RequestLimitError::ConcurrentUpload => concat!(
"The request exceeds the maximum number ",
"of concurrent uploads."
),
}
.into(),
limit: Some(limit_type),

View file

@ -182,6 +182,10 @@ impl SetError {
Self::new(SetErrorType::NotFound)
}
pub fn over_quota() -> Self {
Self::new(SetErrorType::OverQuota).with_description("Account quota exceeded.")
}
pub fn already_exists() -> Self {
Self::new(SetErrorType::AlreadyExists)
}

View file

@ -58,6 +58,7 @@ pub enum IndexAs {
LongInteger,
HasProperty,
Acl,
Quota,
#[default]
None,
}
@ -433,6 +434,14 @@ fn merge_batch(
_ => {}
}
}
IndexAs::Quota => {
if let Some(current_value) = current_value.try_cast_uint() {
batch.quota(-(current_value as i64));
}
if let Some(value) = value.try_cast_uint() {
batch.quota(value as i64);
}
}
IndexAs::None => (),
}
}
@ -559,6 +568,11 @@ fn build_batch(
}
}
}
(Value::UnsignedInt(bytes), IndexAs::Quota) => {
batch.ops.push(Operation::UpdateQuota {
bytes: if set { *bytes as i64 } else { -(*bytes as i64) },
});
}
(value, IndexAs::HasProperty) if value != &Value::Null => {
batch.ops.push(Operation::Bitmap {
family: ().family(),

View file

@ -72,7 +72,7 @@ impl Request {
Err(RequestError::not_request("Invalid JMAP request"))
}
} else {
Err(RequestError::limit(RequestLimitError::Size))
Err(RequestError::limit(RequestLimitError::SizeRequest))
}
}

View file

@ -194,7 +194,7 @@ impl WebSocketMessage {
_ => Err(RequestError::not_request("Invalid WebSocket JMAP request").into()),
}
} else {
Err(RequestError::limit(RequestLimitError::Size).into())
Err(RequestError::limit(RequestLimitError::SizeRequest).into())
}
}
}

View file

@ -35,7 +35,7 @@ use utils::codec::{
use crate::parser::{base32::JsonBase32Reader, json::Parser, JsonObjectParser};
use super::{collection::Collection, date::UTCDate};
use super::collection::Collection;
const B_LINKED: u8 = 0x10;
const B_LINKED_MAILDIR: u8 = 0x20;
@ -77,17 +77,11 @@ impl BlobId {
}
pub fn temporary(account_id: u32) -> Self {
let now_secs = now();
let now = UTCDate::from_timestamp(now_secs as i64);
Self {
kind: BlobKind::Temporary {
account_id,
creation_year: now.year,
creation_month: now.month,
creation_day: now.day,
seq: ((now_secs % 86400) as u32) << 15
| rand::thread_rng().gen_range(0u32..=32767u32),
timestamp: now(),
seq: rand::thread_rng().gen_range(0u32..=u32::MAX),
},
section: None,
}
@ -151,9 +145,7 @@ impl BlobId {
},
B_TEMPORARY => BlobKind::Temporary {
account_id: it.next_leb128()?,
creation_year: u16::from_be_bytes([*it.next()?.borrow(), *it.next()?.borrow()]),
creation_month: *it.next()?.borrow(),
creation_day: *it.next()?.borrow(),
timestamp: it.next_leb128()?,
seq: it.next_leb128()?,
},
_ => return None,
@ -198,16 +190,12 @@ impl BlobId {
}
BlobKind::Temporary {
account_id,
creation_year,
creation_month,
creation_day,
timestamp,
seq,
} => {
let _ = writer.write(&[kind | B_TEMPORARY]);
let _ = writer.write_leb128(*account_id);
let _ = writer.write(&creation_year.to_be_bytes()[..]);
let _ = writer.write(&[*creation_month]);
let _ = writer.write(&[*creation_day]);
let _ = writer.write_leb128(*timestamp);
let _ = writer.write_leb128(*seq);
}
}

View file

@ -35,19 +35,7 @@ use crate::{mailbox::set::SCHEMA, JMAP};
impl JMAP {
pub async fn delete_account(&self, account_id: u32) -> store::Result<()> {
// Delete blobs
self.store
.bulk_delete_blob(&store::BlobKind::Linked {
account_id,
collection: Collection::Email.into(),
document_id: 0,
})
.await?;
self.store
.bulk_delete_blob(&store::BlobKind::LinkedMaildir {
account_id,
document_id: 0,
})
.await?;
self.store.delete_account_blobs(account_id).await?;
// Delete mailboxes
let mut batch = BatchBuilder::new();

View file

@ -64,6 +64,15 @@ impl crate::Config {
upload_max_concurrent: settings
.property("jmap.protocol.upload.max-concurrent")?
.unwrap_or(4),
upload_tmp_quota_size: settings
.property("jmap.protocol.upload.quota.size")?
.unwrap_or(50000000),
upload_tmp_quota_amount: settings
.property("jmap.protocol.upload.quota.files")?
.unwrap_or(50000000),
upload_tmp_ttl: settings
.property_or_static::<Duration>("jmap.protocol.upload.ttl", "1h")?
.as_secs(),
mailbox_max_depth: settings.property("jmap.mailbox.max-depth")?.unwrap_or(10),
mailbox_name_max_len: settings
.property("jmap.mailbox.max-name-length")?

View file

@ -38,7 +38,6 @@ use jmap_proto::{
types::{blob::BlobId, id::Id},
};
use serde_json::Value;
use store::BlobKind;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
@ -80,6 +79,7 @@ pub async fn parse_jmap_request(
("", &Method::POST) => {
return match fetch_body(&mut req, jmap.config.request_max_size)
.await
.ok_or_else(|| RequestError::limit(RequestLimitError::SizeRequest))
.and_then(|bytes| {
Request::parse(
&bytes,
@ -128,7 +128,7 @@ pub async fn parse_jmap_request(
if let Some(account_id) = path.next().and_then(|p| Id::from_bytes(p.as_bytes()))
{
return match fetch_body(&mut req, jmap.config.upload_max_size).await {
Ok(bytes) => {
Some(bytes) => {
match jmap
.blob_upload(
account_id,
@ -145,7 +145,8 @@ pub async fn parse_jmap_request(
Err(err) => err.into_http_response(),
}
}
Err(err) => err.into_http_response(),
None => RequestError::limit(RequestLimitError::SizeUpload)
.into_http_response(),
};
}
}
@ -266,30 +267,7 @@ pub async fn parse_jmap_request(
};
}
("blob", "purge", &Method::GET) => {
let mut date = [u16::MAX; 3];
for part in date.iter_mut() {
if let Some(item) = path.next().and_then(|s| s.parse::<u16>().ok()) {
*part = item;
} else {
return RequestError::blank(
StatusCode::BAD_REQUEST.as_u16(),
"Invalid parameters",
"Expected YYYY/MM/DD date",
)
.into_http_response();
}
}
return match jmap
.store
.bulk_delete_blob(&BlobKind::Temporary {
creation_year: date[0],
creation_month: date[1] as u8,
creation_day: date[2] as u8,
account_id: 0,
seq: 0,
})
.await
{
return match jmap.store.purge_tmp_blobs(jmap.config.upload_tmp_ttl).await {
Ok(_) => {
JsonResponse::new(Value::String("success".into())).into_http_response()
}
@ -396,18 +374,18 @@ async fn handle_request<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
}
}
pub async fn fetch_body(req: &mut HttpRequest, max_size: usize) -> Result<Vec<u8>, RequestError> {
pub async fn fetch_body(req: &mut HttpRequest, max_size: usize) -> Option<Vec<u8>> {
let mut bytes = Vec::with_capacity(1024);
while let Some(Ok(frame)) = req.frame().await {
if let Some(data) = frame.data_ref() {
if bytes.len() + data.len() <= max_size {
bytes.extend_from_slice(data);
} else {
return Err(RequestError::limit(RequestLimitError::Size));
return None;
}
}
}
Ok(bytes)
bytes.into()
}
pub trait ToHttpResponse {

View file

@ -223,7 +223,7 @@ pub async fn parse_form_data(
.and_then(|val| val.parse::<mime::Mime>().ok()),
fetch_body(req, 2048).await,
) {
(Some(content_type), Ok(body)) => {
(Some(content_type), Some(body)) => {
let mut fields = HashMap::new();
if let Some(boundary) = content_type.get_param(mime::BOUNDARY) {
for mut field in form_data::FormData::new(&body[..], boundary.as_str()).flatten() {

View file

@ -107,7 +107,7 @@ impl JMAP {
} else if access_token.is_super_user() {
Ok(InFlight::default())
} else {
Err(RequestError::limit(RequestLimitError::Concurrent))
Err(RequestError::limit(RequestLimitError::ConcurrentRequest))
}
} else if access_token.is_super_user() {
Ok(InFlight::default())
@ -140,7 +140,7 @@ impl JMAP {
} else if access_token.is_super_user() {
Ok(InFlight::default())
} else {
Err(RequestError::limit(RequestLimitError::Concurrent))
Err(RequestError::limit(RequestLimitError::ConcurrentUpload))
}
}

View file

@ -33,6 +33,10 @@ use crate::{auth::AccessToken, JMAP};
use super::UploadResponse;
#[cfg(feature = "test_mode")]
pub static DISABLE_UPLOAD_QUOTA: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(true);
impl JMAP {
pub async fn blob_upload(
&self,
@ -52,6 +56,40 @@ impl JMAP {
}
}
// Enforce quota
let (total_files, total_bytes) = self
.store
.get_tmp_blob_usage(account_id.document_id(), self.config.upload_tmp_ttl)
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "blob_store",
account_id = account_id.document_id(),
error = ?err,
"Failed to obtain blob quota");
RequestError::internal_server_error()
})?;
if ((self.config.upload_tmp_quota_size > 0
&& total_bytes + data.len() > self.config.upload_tmp_quota_size)
|| (self.config.upload_tmp_quota_amount > 0
&& total_files + 1 > self.config.upload_tmp_quota_amount))
&& !access_token.is_super_user()
{
let err = Err(RequestError::over_blob_quota(
self.config.upload_tmp_quota_amount,
self.config.upload_tmp_quota_size,
));
#[cfg(feature = "test_mode")]
if !DISABLE_UPLOAD_QUOTA.load(std::sync::atomic::Ordering::Relaxed) {
return err;
}
#[cfg(not(feature = "test_mode"))]
return err;
}
let blob_id = BlobId::temporary(account_id.document_id());
match self.store.put_blob(&blob_id.kind, data).await {

View file

@ -103,6 +103,9 @@ impl JMAP {
let on_success_delete = request.on_success_destroy_original.unwrap_or(false);
let mut destroy_ids = Vec::new();
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
'create: for (id, create) in request.create {
let id = id.unwrap();
let from_message_id = id.document_id();
@ -243,6 +246,16 @@ impl JMAP {
continue;
};
// Check quota
if account_quota > 0
&& metadata.get(&Property::Size).as_uint().unwrap_or_default() as i64
+ self.get_used_quota(account_id).await?
> account_quota
{
response.not_created.append(id, SetError::over_quota());
continue;
}
// Set receivedAt
if let Some(received_at) = received_at {
metadata.set(Property::ReceivedAt, Value::Date(received_at));

View file

@ -36,10 +36,13 @@ use jmap_proto::{
type_state::TypeState,
},
};
use mail_parser::Message;
use utils::map::vec_map::VecMap;
use crate::{auth::AccessToken, IngestError, JMAP};
use super::ingest::IngestEmail;
impl JMAP {
pub async fn email_import(
&self,
@ -61,6 +64,9 @@ impl JMAP {
None
};
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
let mut response = ImportEmailResponse {
account_id: request.account_id,
new_state: old_state.clone(),
@ -126,14 +132,16 @@ impl JMAP {
// Import message
match self
.email_ingest(
(&raw_message).into(),
.email_ingest(IngestEmail {
raw_message: &raw_message,
message: Message::parse(&raw_message),
account_id,
account_quota,
mailbox_ids,
email.keywords,
email.received_at.map(|r| r.into()),
false,
)
keywords: email.keywords,
received_at: email.received_at.map(|r| r.into()),
skip_duplicates: false,
})
.await
{
Ok(email) => {
@ -145,6 +153,13 @@ impl JMAP {
SetError::new(SetErrorType::InvalidEmail).with_description(reason),
);
}
Err(IngestError::OverQuota) => {
response.not_created.append(
id,
SetError::new(SetErrorType::OverQuota)
.with_description("You have exceeded your disk quota."),
);
}
Err(IngestError::Temporary) => {
return Err(MethodError::ServerPartialFail);
}

View file

@ -88,7 +88,8 @@ impl IndexMessage for BatchBuilder {
// Index size
metadata.append(Property::Size, message.raw_message.len());
self.value(Property::Size, message.raw_message.len() as u32, F_INDEX);
self.value(Property::Size, message.raw_message.len() as u32, F_INDEX)
.quota(message.raw_message.len() as i64);
// Index receivedAt
metadata.append(
@ -418,7 +419,13 @@ impl IntoOperations for EmailIndexBuilder {
for (property, value) in self.inner.properties {
match (&property, value) {
(Property::Size, Value::UnsignedInt(size)) => {
batch.value(Property::Size, size as u32, F_INDEX | options);
batch
.value(Property::Size, size as u32, F_INDEX | options)
.quota(if self.set {
size as i64
} else {
-(size as i64)
});
}
(Property::ReceivedAt | Property::SentAt, Value::Date(date)) => {
batch.value(property, date.timestamp() as u64, F_INDEX | options);

View file

@ -57,22 +57,36 @@ pub struct IngestedEmail {
pub struct IngestEmail<'x> {
pub raw_message: &'x [u8],
pub message: Option<Message<'x>>,
pub account_id: u32,
pub account_quota: i64,
pub mailbox_ids: Vec<u32>,
pub keywords: Vec<Keyword>,
pub received_at: Option<u64>,
pub skip_duplicates: bool,
}
impl JMAP {
#[allow(clippy::blocks_in_if_conditions)]
pub async fn email_ingest(
&self,
ingest_email: IngestEmail<'_>,
account_id: u32,
mailbox_ids: Vec<u32>,
keywords: Vec<Keyword>,
received_at: Option<u64>,
skip_duplicates: bool,
params: IngestEmail<'_>,
) -> Result<IngestedEmail, IngestError> {
// Check quota
let raw_message_len = params.raw_message.len() as i64;
if params.account_quota > 0
&& raw_message_len
+ self
.get_used_quota(params.account_id)
.await
.map_err(|_| IngestError::Temporary)?
> params.account_quota
{
return Err(IngestError::OverQuota);
}
// Parse message
let raw_message = ingest_email.raw_message;
let message = ingest_email.message.ok_or_else(|| IngestError::Permanent {
let raw_message = params.raw_message;
let message = params.message.ok_or_else(|| IngestError::Permanent {
code: [5, 5, 0],
reason: "Failed to parse e-mail message.".to_string(),
})?;
@ -118,12 +132,12 @@ impl JMAP {
}
// Check for duplicates
if skip_duplicates
if params.skip_duplicates
&& !references.is_empty()
&& !self
.store
.filter(
account_id,
params.account_id,
Collection::Email,
references
.iter()
@ -151,7 +165,7 @@ impl JMAP {
}
let thread_id = if !references.is_empty() {
self.find_or_merge_thread(account_id, subject, &references)
self.find_or_merge_thread(params.account_id, subject, &references)
.await?
} else {
None
@ -160,7 +174,7 @@ impl JMAP {
// Obtain a documentId and changeId
let document_id = self
.store
.assign_document_id(account_id, Collection::Email)
.assign_document_id(params.account_id, Collection::Email)
.await
.map_err(|err| {
tracing::error!(
@ -172,7 +186,7 @@ impl JMAP {
})?;
let change_id = self
.store
.assign_change_id(account_id)
.assign_change_id(params.account_id)
.await
.map_err(|err| {
tracing::error!(
@ -184,7 +198,7 @@ impl JMAP {
})?;
// Store blob
let blob_id = BlobId::maildir(account_id, document_id);
let blob_id = BlobId::maildir(params.account_id, document_id);
self.store
.put_blob(&blob_id.kind, raw_message)
.await
@ -199,7 +213,7 @@ impl JMAP {
// Prepare batch
let mut batch = BatchBuilder::new();
batch.with_account_id(account_id);
batch.with_account_id(params.account_id);
// Build change log
let mut changes = ChangeLogBuilder::with_change_id(change_id);
@ -209,7 +223,7 @@ impl JMAP {
} else {
let thread_id = self
.store
.assign_document_id(account_id, Collection::Thread)
.assign_document_id(params.account_id, Collection::Thread)
.await
.map_err(|err| {
tracing::error!(
@ -227,7 +241,7 @@ impl JMAP {
};
let id = Id::from_parts(thread_id, document_id);
changes.log_insert(Collection::Email, id);
for mailbox_id in &mailbox_ids {
for mailbox_id in &params.mailbox_ids {
changes.log_child_update(Collection::Mailbox, *mailbox_id);
}
@ -237,9 +251,9 @@ impl JMAP {
.create_document(document_id)
.index_message(
message,
keywords,
mailbox_ids,
received_at.unwrap_or_else(now),
params.keywords,
params.mailbox_ids,
params.received_at.unwrap_or_else(now),
self.config.default_language,
)
.map_err(|err| {
@ -265,7 +279,7 @@ impl JMAP {
id,
change_id,
blob_id,
size: raw_message.len(),
size: raw_message_len as usize,
})
}
@ -434,33 +448,6 @@ impl JMAP {
}
}
impl<'x> From<&'x [u8]> for IngestEmail<'x> {
fn from(raw_message: &'x [u8]) -> Self {
IngestEmail {
raw_message,
message: Message::parse(raw_message),
}
}
}
impl<'x> From<&'x Vec<u8>> for IngestEmail<'x> {
fn from(raw_message: &'x Vec<u8>) -> Self {
IngestEmail {
raw_message,
message: Message::parse(raw_message),
}
}
}
impl<'x> IngestEmail<'x> {
pub fn new(raw_message: &'x [u8], message: Message<'x>) -> Self {
IngestEmail {
raw_message,
message: message.into(),
}
}
}
impl From<IngestedEmail> for Object<Value> {
fn from(email: IngestedEmail) -> Self {
Object::with_capacity(3)

View file

@ -50,6 +50,7 @@ use mail_builder::{
mime::{BodyPart, MimePart},
MessageBuilder,
};
use mail_parser::Message;
use store::{
ahash::AHashSet,
fts::term_index::TokenIndex,
@ -60,11 +61,12 @@ use store::{
BlobKind, Serialize, ValueKey,
};
use crate::{auth::AccessToken, JMAP};
use crate::{auth::AccessToken, IngestError, JMAP};
use super::{
headers::{BuildHeader, ValueToHeader},
index::EmailIndexBuilder,
ingest::IngestEmail,
};
impl JMAP {
@ -106,6 +108,9 @@ impl JMAP {
let will_destroy = request.unwrap_destroy();
// Obtain quota
let account_quota = self.get_quota(access_token, account_id).await?;
// Process creates
'create: for (id, mut object) in request.unwrap_create() {
let has_body_structure = object
@ -722,20 +727,31 @@ impl JMAP {
builder.write_to(&mut raw_message).unwrap_or_default();
// Ingest message
response.created.insert(
id,
self.email_ingest(
(&raw_message).into(),
match self
.email_ingest(IngestEmail {
raw_message: &raw_message,
message: Message::parse(&raw_message),
account_id,
mailboxes,
account_quota,
mailbox_ids: mailboxes,
keywords,
received_at,
false,
)
skip_duplicates: false,
})
.await
.map_err(|_| MethodError::ServerPartialFail)?
.into(),
);
{
Ok(message) => {
response.created.insert(id, message.into());
}
Err(IngestError::OverQuota) => {
response.not_created.append(
id,
SetError::new(SetErrorType::OverQuota)
.with_description("You have exceeded your disk quota."),
);
}
Err(_) => return Err(MethodError::ServerPartialFail),
}
}
// Process updates

View file

@ -111,6 +111,10 @@ pub struct Config {
pub upload_max_size: usize,
pub upload_max_concurrent: usize,
pub upload_tmp_quota_size: usize,
pub upload_tmp_quota_amount: usize,
pub upload_tmp_ttl: u64,
pub mailbox_max_depth: usize,
pub mailbox_name_max_len: usize,
pub mail_attachments_max_size: usize,
@ -150,6 +154,7 @@ pub struct Bincode<T: serde::Serialize + serde::de::DeserializeOwned> {
pub enum IngestError {
Temporary,
OverQuota,
Permanent { code: [u8; 3], reason: String },
}
@ -527,6 +532,43 @@ impl JMAP {
)
}
pub async fn get_quota(
&self,
access_token: &AccessToken,
account_id: u32,
) -> Result<i64, MethodError> {
Ok(if access_token.primary_id == account_id {
access_token.quota as i64
} else {
self.directory
.principal_by_id(account_id)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "get_quota",
account_id = account_id,
error = ?err,
"Failed to obtain disk quota for account.");
MethodError::ServerPartialFail
})?
.map(|p| p.quota as i64)
.unwrap_or_default()
})
}
pub async fn get_used_quota(&self, account_id: u32) -> Result<i64, MethodError> {
self.store.get_quota(account_id).await.map_err(|err| {
tracing::error!(
event = "error",
context = "get_used_quota",
account_id = account_id,
error = ?err,
"Failed to obtain used disk quota for account.");
MethodError::ServerPartialFail
})
}
pub async fn filter(
&self,
account_id: u32,

View file

@ -24,8 +24,7 @@
use std::{sync::Arc, time::Duration};
use chrono::{Datelike, TimeZone};
use jmap_proto::types::date::UTCDate;
use store::{write::now, BlobKind};
use store::write::now;
use tokio::sync::mpsc;
use utils::{config::Config, failed, UnwrapFailure};
@ -108,23 +107,9 @@ pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Recei
}
}
TASK_PURGE_BLOBS => {
let ts = UTCDate::from_timestamp((now - 86400) as i64);
tracing::info!(
"Purging temporary blobs for {}/{}/{}.",
ts.day,
ts.month,
ts.year
);
if let Err(err) = core
.store
.bulk_delete_blob(&BlobKind::Temporary {
creation_year: ts.year,
creation_month: ts.month,
creation_day: ts.day,
account_id: 0,
seq: 0,
})
.await
tracing::info!("Purging temporary blobs.",);
if let Err(err) =
core.store.purge_tmp_blobs(core.config.upload_tmp_ttl).await
{
tracing::error!("Error while purging bitmaps: {}", err);
}

View file

@ -22,10 +22,11 @@
*/
use jmap_proto::types::{state::StateChange, type_state::TypeState};
use mail_parser::Message;
use store::ahash::AHashMap;
use utils::ipc::{DeliveryResult, IngestMessage};
use crate::{mailbox::INBOX_ID, IngestError, JMAP};
use crate::{email::ingest::IngestEmail, mailbox::INBOX_ID, IngestError, JMAP};
impl JMAP {
pub async fn deliver_message(&self, message: IngestMessage) -> Vec<DeliveryResult> {
@ -67,14 +68,27 @@ impl JMAP {
.await
}
Ok(None) => {
self.email_ingest(
(&raw_message).into(),
*uid,
vec![INBOX_ID],
vec![],
None,
true,
)
let account_quota = match self.directory.principal_by_id(*uid).await {
Ok(Some(p)) => p.quota as i64,
Ok(None) => 0,
Err(_) => {
*status = DeliveryResult::TemporaryFailure {
reason: "Transient server failure.".into(),
};
continue;
}
};
self.email_ingest(IngestEmail {
raw_message: &raw_message,
message: Message::parse(&raw_message),
account_id: *uid,
account_quota,
mailbox_ids: vec![INBOX_ID],
keywords: vec![],
received_at: None,
skip_duplicates: true,
})
.await
}
Err(_) => {
@ -100,6 +114,11 @@ impl JMAP {
}
}
Err(err) => match err {
IngestError::OverQuota => {
*status = DeliveryResult::TemporaryFailure {
reason: "Mailbox over quota.".into(),
}
}
IngestError::Temporary => {
*status = DeliveryResult::TemporaryFailure {
reason: "Transient server failure.".into(),

View file

@ -97,7 +97,7 @@ impl JMAP {
}
Property::BlobId => {
if let Some(Value::UnsignedInt(blob_size)) =
push.properties.remove(&Property::BlobId)
push.properties.remove(&Property::Size)
{
result.append(
Property::BlobId,
@ -214,7 +214,7 @@ impl JMAP {
// Obtain the sieve script length
let script_offset = script_object
.properties
.get(&Property::BlobId)
.get(&Property::Size)
.and_then(|value| value.as_uint())
.ok_or_else(|| {
tracing::warn!(

View file

@ -412,7 +412,14 @@ impl JMAP {
}
// Deliver messages
let mut has_temp_errors = false;
let account_quota = match self.directory.principal_by_id(account_id).await {
Ok(Some(p)) => p.quota as i64,
Ok(None) => 0,
Err(_) => {
return Err(IngestError::Temporary);
}
};
let mut last_temp_error = None;
let mut has_delivered = false;
for (message_id, sieve_message) in messages.into_iter().enumerate() {
if !sieve_message.file_into.is_empty() {
@ -432,22 +439,24 @@ impl JMAP {
// Deliver message
match self
.email_ingest(
IngestEmail::new(&sieve_message.raw_message, message),
.email_ingest(IngestEmail {
raw_message: &sieve_message.raw_message,
message: message.into(),
account_id,
sieve_message.file_into,
sieve_message.flags,
None,
true,
)
account_quota,
mailbox_ids: sieve_message.file_into,
keywords: sieve_message.flags,
received_at: None,
skip_duplicates: true,
})
.await
{
Ok(ingested_message_) => {
has_delivered = true;
ingested_message = ingested_message_;
}
Err(_) => {
has_temp_errors = true;
Err(err) => {
last_temp_error = err.into();
}
}
}
@ -474,11 +483,11 @@ impl JMAP {
code: [5, 7, 1],
reason: reject_reason,
})
} else if has_delivered || !has_temp_errors {
} else if has_delivered || last_temp_error.is_none() {
Ok(ingested_message)
} else {
// There were problems during delivery
Err(IngestError::Temporary)
Err(last_temp_error.unwrap())
}
}
}

View file

@ -54,6 +54,7 @@ use crate::{auth::AccessToken, JMAP};
struct SetContext<'x> {
account_id: u32,
account_quota: i64,
access_token: &'x AccessToken,
response: SetResponse,
}
@ -67,6 +68,7 @@ pub static SCHEMA: &[IndexProperty] = &[
.max_size(255)
.required(),
IndexProperty::new(Property::IsActive).index_as(IndexAs::Integer),
IndexProperty::new(Property::Size).index_as(IndexAs::Quota),
];
impl JMAP {
@ -82,6 +84,7 @@ impl JMAP {
.unwrap_or_default();
let mut ctx = SetContext {
account_id,
account_quota: self.get_quota(access_token, account_id).await?,
access_token,
response: self
.prepare_set_response(&request, Collection::SieveScript)
@ -451,10 +454,18 @@ impl JMAP {
}) {
// Check access
if let Some(mut bytes) = self.blob_download(&blob_id, ctx.access_token).await? {
// Check quota
if ctx.account_quota > 0
&& bytes.len() as i64 + self.get_used_quota(ctx.account_id).await?
> ctx.account_quota
{
return Ok(Err(SetError::over_quota()));
}
// Compile script
match self.sieve_compiler.compile(&bytes) {
Ok(script) => {
changes.set(Property::BlobId, Value::UnsignedInt(bytes.len() as u64));
changes.set(Property::Size, Value::UnsignedInt(bytes.len() as u64));
bytes.extend(bincode::serialize(&script).unwrap_or_default());
bytes.into()
}

View file

@ -393,7 +393,7 @@ impl JMAP {
match self.sieve_compiler.compile(&script) {
Ok(compiled_script) => {
// Update blob length
obj.set(Property::BlobId, Value::UnsignedInt(script.len() as u64));
obj.set(Property::Size, Value::UnsignedInt(script.len() as u64));
// Serialize script
script.extend(bincode::serialize(&compiled_script).unwrap_or_default());

View file

@ -8,7 +8,7 @@ resolver = "2"
utils = { path = "../utils" }
maybe-async = { path = "../maybe-async" }
rocksdb = { version = "0.20.1", optional = true }
foundationdb = { version = "0.7.0", optional = true }
foundationdb = { version = "0.8.0", optional = true }
rusqlite = { version = "0.29.0", features = ["bundled"], optional = true }
rust-s3 = { version = "0.33.0", default-features = false, features = ["tokio-rustls-tls"] }
tokio = { version = "1.23", features = ["sync", "fs", "io-util"] }

View file

@ -29,7 +29,7 @@ use futures::StreamExt;
use crate::{
write::key::KeySerializer, Store, SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
SUBSPACE_VALUES,
SUBSPACE_QUOTAS, SUBSPACE_VALUES,
};
use super::bitmap::DenseBitmap;
@ -116,6 +116,18 @@ impl Store {
}
}
// Delete quota key
let trx = self.db.create_trx()?;
trx.clear(
&KeySerializer::new(5)
.write(SUBSPACE_QUOTAS)
.write(account_id)
.finalize(),
);
if let Err(err) = trx.commit().await {
return Err(FdbError::from(err).into());
}
Ok(())
}
}

View file

@ -37,7 +37,7 @@ use crate::{
query::Operator,
write::key::{DeserializeBigEndian, KeySerializer},
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ReadTransaction, Serialize,
Store, SUBSPACE_INDEXES,
Store, SUBSPACE_INDEXES, SUBSPACE_QUOTAS,
};
use super::bitmap::DeserializeBlock;
@ -353,6 +353,29 @@ impl ReadTransaction<'_> {
Ok(None)
}
pub async fn get_quota(&self, account_id: u32) -> crate::Result<i64> {
if let Some(bytes) = self
.trx
.get(
&KeySerializer::new(5)
.write(SUBSPACE_QUOTAS)
.write(account_id)
.finalize(),
true,
)
.await?
{
Ok(i64::from_le_bytes(bytes[..].try_into().map_err(|_| {
crate::Error::InternalError(format!(
"Invalid quota value for account {}",
account_id
))
})?))
} else {
Ok(0)
}
}
pub async fn refresh_if_old(&mut self) -> crate::Result<()> {
if self.trx_age.elapsed() > Duration::from_millis(2000) {
self.trx = self.db.create_trx()?;
@ -433,6 +456,13 @@ impl Store {
value
);
}
SUBSPACE_QUOTAS => {
let v = i64::from_le_bytes(value[..].try_into().unwrap());
if v != 0 {
let k = u32::from_be_bytes(key[1..].try_into().unwrap());
panic!("Table quotas is not empty: {k:?} = {v:?} (key {key:?})");
}
}
SUBSPACE_LOGS => (),
_ => panic!("Invalid key found in database: {key:?} for subspace {subspace}"),

View file

@ -36,7 +36,8 @@ use crate::{
key::{DeserializeBigEndian, KeySerializer},
now, Batch, Operation,
},
AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, SUBSPACE_VALUES,
AclKey, BitmapKey, Deserialize, IndexKey, LogKey, Serialize, Store, ValueKey, SUBSPACE_QUOTAS,
SUBSPACE_VALUES,
};
use super::bitmap::{next_available_index, DenseBitmap, BITS_PER_BLOCK};
@ -203,6 +204,16 @@ impl Store {
return Err(crate::Error::AssertValueFailed);
}
}
Operation::UpdateQuota { bytes } => {
trx.atomic_op(
&KeySerializer::new(5)
.write(SUBSPACE_QUOTAS)
.write(account_id)
.finalize(),
&bytes.to_le_bytes()[..],
MutationType::Add,
);
}
}
}
@ -257,7 +268,7 @@ impl Store {
.entry(key.clone())
.or_default()
.insert(document_id),
"key {key:?} already contains document {document_id}"
"key {key:?} ({op:?}) already contains document {document_id}"
);
} else {
assert!(
@ -266,7 +277,7 @@ impl Store {
.get_mut(&key)
.unwrap()
.remove(&document_id),
"key {key:?} does not contain document {document_id}"
"key {key:?} ({op:?}) does not contain document {document_id}"
);
}
}
@ -476,5 +487,6 @@ impl Store {
let trx = self.db.create_trx().unwrap();
trx.clear_range(&[0u8], &[u8::MAX]);
trx.commit().await.unwrap();
BITMAPS.lock().clear();
}
}

View file

@ -101,6 +101,14 @@ impl Store {
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS q (
k INTEGER PRIMARY KEY,
v INTEGER NOT NULL DEFAULT 0
)",
[],
)?;
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (

View file

@ -81,6 +81,8 @@ impl Store {
))?
.execute([&from_key, &to_key])?;
}
conn.prepare_cached("DELETE FROM q WHERE k = ?")?
.execute([account_id as i64])?;
Ok(())
})

View file

@ -334,6 +334,19 @@ impl ReadTransaction<'_> {
.map_err(Into::into)
}
#[maybe_async::maybe_async]
pub(crate) async fn get_quota(&self, account_id: u32) -> crate::Result<i64> {
match self
.conn
.prepare_cached("SELECT v FROM q WHERE k = ?")?
.query_row([account_id as i64], |row| row.get::<_, i64>(0))
{
Ok(value) => Ok(value),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(0),
Err(e) => Err(e.into()),
}
}
#[maybe_async::maybe_async]
pub async fn refresh_if_old(&mut self) -> crate::Result<()> {
Ok(())
@ -352,6 +365,7 @@ impl Store {
#[cfg(feature = "test_mode")]
pub async fn assert_is_empty(&self) {
let conn = self.read_transaction().unwrap();
// Values
let mut query = conn.conn.prepare_cached("SELECT k, v FROM v").unwrap();
let mut rows = query.query([]).unwrap();
@ -362,6 +376,7 @@ impl Store {
panic!("Table values is not empty: {key:?} {value:?}");
}
// Indexes
let mut query = conn.conn.prepare_cached("SELECT k FROM i").unwrap();
let mut rows = query.query([]).unwrap();
@ -379,6 +394,7 @@ impl Store {
);
}
// Bitmaps
self.purge_bitmaps().await.unwrap();
let mut query = conn
.conn
@ -397,6 +413,21 @@ impl Store {
panic!("Table bitmaps failed to purge, found key: {key:?}");
}
// Quotas
let mut query = conn.conn.prepare_cached("SELECT k, v FROM q").unwrap();
let mut rows = query.query([]).unwrap();
while let Some(row) = rows.next().unwrap() {
let key = row.get::<_, i64>(0).unwrap();
let value = row.get::<_, i64>(1).unwrap();
if value != 0 {
panic!(
"Table quota is not empty, account {}, quota: {}",
key, value,
);
}
}
// Delete logs
conn.conn.execute("DELETE FROM l", []).unwrap();

View file

@ -245,6 +245,18 @@ impl Store {
return Err(crate::Error::AssertValueFailed);
}
}
Operation::UpdateQuota { bytes } => {
if *bytes >= 0 {
trx.prepare_cached(concat!(
"INSERT INTO q (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + excluded.v"
))?
.execute(params![account_id, *bytes])?;
} else {
trx.prepare_cached("UPDATE q SET v = v + ? WHERE k = ?")?
.execute(params![*bytes, account_id])?;
}
}
}
}
@ -271,7 +283,9 @@ impl Store {
#[cfg(feature = "test_mode")]
pub async fn destroy(&self) {
use crate::{SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES};
use crate::{
SUBSPACE_BITMAPS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_QUOTAS, SUBSPACE_VALUES,
};
let conn = self.conn_pool.get().unwrap();
for table in [
@ -279,6 +293,7 @@ impl Store {
SUBSPACE_LOGS,
SUBSPACE_BITMAPS,
SUBSPACE_INDEXES,
SUBSPACE_QUOTAS,
] {
conn.execute(&format!("DROP TABLE {}", char::from(table)), [])
.unwrap();

View file

@ -146,43 +146,12 @@ fn get_local_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf {
}
BlobKind::Temporary {
account_id,
creation_year,
creation_month,
creation_day,
timestamp,
seq,
} => {
let mut path = base_path.path_temporary.to_path_buf();
path.push(creation_year.to_string());
path.push(creation_month.to_string());
path.push(creation_day.to_string());
path.push(format!("{:x}_{:x}", account_id, seq));
path
}
}
}
fn get_local_root_path(base_path: &BlobPaths, kind: &BlobKind) -> PathBuf {
match kind {
BlobKind::LinkedMaildir { account_id, .. } => {
let mut path = base_path.path_email.to_path_buf();
path.push(format!("{:x}", account_id));
path
}
BlobKind::Linked { account_id, .. } => {
let mut path = base_path.path_other.to_path_buf();
path.push(format!("{:x}", account_id));
path
}
BlobKind::Temporary {
creation_year,
creation_month,
creation_day,
..
} => {
let mut path = base_path.path_temporary.to_path_buf();
path.push(creation_year.to_string());
path.push(creation_month.to_string());
path.push(creation_day.to_string());
path.push(format!("{:x}_{:x}", timestamp, seq));
path
}
}
@ -201,33 +170,8 @@ fn get_s3_path(kind: &BlobKind) -> String {
} => format!("/{:x}/{:x}/{:x}", account_id, collection, document_id),
BlobKind::Temporary {
account_id,
creation_year,
creation_month,
creation_day,
timestamp,
seq,
} => format!(
"/tmp/{}/{}/{}/{:x}_{:x}",
creation_year, creation_month, creation_day, account_id, seq
),
}
}
fn get_s3_root_path(kind: &BlobKind) -> String {
match kind {
BlobKind::LinkedMaildir { account_id, .. } => {
format!("/{:x}/", account_id)
}
BlobKind::Linked { account_id, .. } => {
format!("/{:x}/", account_id)
}
BlobKind::Temporary {
creation_year,
creation_month,
creation_day,
..
} => format!(
"/tmp/{}/{}/{}/",
creation_year, creation_month, creation_day
),
} => format!("/tmp/{:x}/{:x}_{:x}", account_id, timestamp, seq),
}
}

View file

@ -28,9 +28,9 @@ use tokio::{
io::AsyncWriteExt,
};
use crate::{BlobKind, Store};
use crate::{write::now, BlobKind, Store};
use super::{get_local_path, get_local_root_path, get_s3_path, get_s3_root_path, BlobStore};
use super::{get_local_path, get_s3_path, BlobStore};
impl Store {
pub async fn put_blob(&self, kind: &BlobKind, data: &[u8]) -> crate::Result<()> {
@ -124,22 +124,179 @@ impl Store {
}
}
pub async fn bulk_delete_blob(&self, kind: &BlobKind) -> crate::Result<()> {
pub async fn delete_account_blobs(&self, account_id: u32) -> crate::Result<()> {
match &self.blob {
BlobStore::Local(base_path) => fs::remove_dir_all(get_local_root_path(base_path, kind))
.await
.map_err(Into::into),
BlobStore::Local(base_path) => {
for path in [
&base_path.path_email,
&base_path.path_other,
&base_path.path_temporary,
] {
let mut path = path.to_path_buf();
path.push(format!("{:x}", account_id));
if fs::metadata(&path).await.is_ok() {
fs::remove_dir_all(path).await?;
}
}
Ok(())
}
BlobStore::Remote(bucket) => {
let prefix = get_s3_root_path(kind);
for prefix in [
format!("/{:x}/", account_id),
format!("/tmp/{:x}/", account_id),
] {
let prefix_base = prefix.strip_prefix('/').unwrap();
for object in bucket
.list(prefix.clone(), None)
.await?
.into_iter()
.flat_map(|result| result.contents)
{
if object.key.starts_with(&prefix) || object.key.starts_with(prefix_base) {
let result = bucket.delete_object(object.key).await?;
if !(200..300).contains(&result.status_code()) {
return Err(crate::Error::InternalError(format!(
"Failed to delete bucket item, code {}: {}",
result.status_code(),
String::from_utf8_lossy(result.as_slice())
)));
}
} else {
tracing::debug!("Unexpected S3 object while deleting: {}", object.key);
}
}
}
Ok(())
}
}
}
pub async fn purge_tmp_blobs(&self, ttl: u64) -> crate::Result<()> {
let now = now();
match &self.blob {
BlobStore::Local(base_path) => {
if fs::metadata(&base_path.path_temporary).await.is_ok() {
let mut dir = fs::read_dir(&base_path.path_temporary).await?;
while let Some(item) = dir.next_entry().await? {
if item.metadata().await?.is_dir() {
let mut dir = fs::read_dir(item.path()).await?;
while let Some(item) = dir.next_entry().await? {
if item.metadata().await?.is_file() {
if let Some(timestamp) =
item.file_name().to_str().and_then(parse_timestamp)
{
if now.saturating_sub(timestamp) > ttl {
fs::remove_file(item.path()).await?;
}
} else {
tracing::debug!(
"Found invalid temporary filename while purging: {}",
item.file_name().to_string_lossy()
);
}
}
}
}
}
}
Ok(())
}
BlobStore::Remote(bucket) => {
for object in bucket
.list("/tmp/".to_string(), None)
.await?
.into_iter()
.flat_map(|result| result.contents)
{
if object.key.starts_with("/tmp/") || object.key.starts_with("tmp/") {
if let Some(timestamp) = object
.key
.rsplit_once('/')
.and_then(|(_, name)| parse_timestamp(name))
{
if now.saturating_sub(timestamp) > ttl {
let result = bucket.delete_object(object.key).await?;
if !(200..300).contains(&result.status_code()) {
return Err(crate::Error::InternalError(format!(
"Failed to delete bucket item, code {}: {}",
result.status_code(),
String::from_utf8_lossy(result.as_slice())
)));
}
}
} else {
tracing::debug!(
"Found invalid temporary filename while purging: {}",
object.key
);
}
} else {
tracing::debug!("Unexpected S3 object while purging: {}", object.key);
}
}
Ok(())
}
}
}
pub async fn get_tmp_blob_usage(
&self,
account_id: u32,
ttl: u64,
) -> crate::Result<(usize, usize)> {
let now = now();
let mut total_bytes = 0;
let mut total_files = 0;
match &self.blob {
BlobStore::Local(base_path) => {
let mut path = base_path.path_temporary.to_path_buf();
path.push(format!("{:x}", account_id));
if fs::metadata(&path).await.is_ok() {
let mut dir = fs::read_dir(path).await?;
while let Some(item) = dir.next_entry().await? {
match item.metadata().await {
Ok(metadata) if metadata.is_file() => {
if let Some(timestamp) =
item.file_name().to_str().and_then(parse_timestamp)
{
if now.saturating_sub(timestamp) > ttl {
fs::remove_file(item.path()).await?;
} else {
total_bytes += metadata.len() as usize;
total_files += 1;
}
} else {
tracing::debug!(
"Found invalid temporary filename while purging: {}",
item.file_name().to_string_lossy()
);
}
}
_ => (),
}
}
}
}
BlobStore::Remote(bucket) => {
let prefix = format!("/tmp/{:x}/", account_id);
let prefix_base = prefix.strip_prefix('/').unwrap();
let mut is_truncated = true;
while is_truncated {
for item in bucket.list(prefix.clone(), None).await? {
is_truncated = item.is_truncated && !item.contents.is_empty();
for object in item.contents {
if object.key.starts_with(&prefix)
|| object.key.starts_with(prefix_base)
{
for object in bucket
.list(prefix.clone(), None)
.await?
.into_iter()
.flat_map(|result| result.contents)
{
if object.key.starts_with(prefix_base) || object.key.starts_with(&prefix) {
if let Some(timestamp) = object
.key
.rsplit_once('/')
.and_then(|(_, name)| parse_timestamp(name))
{
if now.saturating_sub(timestamp) > ttl {
let result = bucket.delete_object(object.key).await?;
if !(200..300).contains(&result.status_code()) {
return Err(crate::Error::InternalError(format!(
@ -149,16 +306,27 @@ impl Store {
)));
}
} else {
tracing::debug!(
"Unexpected S3 object while deleting: {}",
item.name
);
total_bytes += object.size as usize;
total_files += 1;
}
} else {
tracing::debug!(
"Found invalid temporary filename while purging: {}",
object.key
);
}
} else {
tracing::debug!("Unexpected S3 object while purging: {}", object.key);
}
}
Ok(())
}
}
Ok((total_files, total_bytes))
}
}
fn parse_timestamp(name: &str) -> Option<u64> {
name.split_once('_')
.and_then(|(timestamp, _)| u64::from_str_radix(timestamp, 16).ok())
}

View file

@ -104,6 +104,7 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
.most_frequent_language()
.unwrap_or(self.default_language);
let mut term_index = TermIndexBuilder::new();
let mut ops = AHashSet::new();
for (part_id, part) in self.parts.iter().enumerate() {
let language = if part.language != Language::Unknown {
@ -111,13 +112,17 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
} else {
default_language
};
let mut unique_words = AHashSet::new();
let mut terms = Vec::new();
for token in Stemmer::new(&part.text, language, MAX_TOKEN_LENGTH).collect::<Vec<_>>() {
unique_words.insert((token.word.to_string(), HASH_EXACT));
ops.insert(Operation::hash(&token.word, HASH_EXACT, part.field, true));
if let Some(stemmed_word) = &token.stemmed_word {
unique_words.insert((stemmed_word.to_string(), HASH_STEMMED));
ops.insert(Operation::hash(
stemmed_word,
HASH_STEMMED,
part.field,
true,
));
}
terms.push(term_index.add_stemmed_token(token));
}
@ -125,20 +130,12 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
if !terms.is_empty() {
term_index.add_terms(part.field, part_id as u32, terms);
}
for (word, family) in unique_words {
batch
.ops
.push(Operation::hash(&word, family, part.field, true));
}
}
for (field, tokens) in self.tokens {
let mut terms = Vec::with_capacity(tokens.len());
for token in tokens {
batch
.ops
.push(Operation::hash(&token, HASH_EXACT, field, true));
ops.insert(Operation::hash(&token, HASH_EXACT, field, true));
terms.push(term_index.add_token(Token {
word: token.into(),
offset: 0,
@ -148,6 +145,10 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
term_index.add_terms(field, 0, terms);
}
for op in ops {
batch.ops.push(op);
}
batch.ops.push(Operation::Value {
field: u8::MAX,
family: u8::MAX,
@ -158,11 +159,12 @@ impl<'x> IntoOperations for FtsIndexBuilder<'x> {
impl TokenIndex {
fn build_index(self, batch: &mut BatchBuilder, set: bool) {
let mut ops = AHashSet::with_capacity(self.tokens.len() * 2);
for term in self.terms {
for (term_ids, is_exact) in [(term.exact_terms, true), (term.stemmed_terms, false)] {
for term_id in term_ids {
if let Some(word) = self.tokens.get(term_id as usize) {
batch.ops.push(Operation::hash(
ops.insert(Operation::hash(
word,
if is_exact { HASH_EXACT } else { HASH_STEMMED },
term.field_id,
@ -172,6 +174,9 @@ impl TokenIndex {
}
}
}
for op in ops {
batch.ops.push(op);
}
}
}

View file

@ -164,9 +164,7 @@ pub enum BlobKind {
},
Temporary {
account_id: u32,
creation_year: u16,
creation_month: u8,
creation_day: u8,
timestamp: u64,
seq: u32,
},
}
@ -229,6 +227,7 @@ pub const SUBSPACE_BITMAPS: u8 = b'b';
pub const SUBSPACE_VALUES: u8 = b'v';
pub const SUBSPACE_LOGS: u8 = b'l';
pub const SUBSPACE_INDEXES: u8 = b'i';
pub const SUBSPACE_QUOTAS: u8 = b'q';
#[cfg(not(feature = "backend"))]
impl Store {

View file

@ -97,6 +97,19 @@ impl Store {
}
}
pub async fn get_quota(&self, account_id: u32) -> crate::Result<i64> {
#[cfg(not(feature = "is_sync"))]
{
self.read_transaction().await?.get_quota(account_id).await
}
#[cfg(feature = "is_sync")]
{
let trx = self.read_transaction()?;
self.spawn_worker(move || trx.get_quota(account_id)).await
}
}
pub async fn get_bitmap<T: AsRef<[u8]> + Send + Sync + 'static>(
&self,
key: BitmapKey<T>,

View file

@ -29,7 +29,7 @@ pub struct HashedValue<T: Deserialize> {
pub inner: T,
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum AssertValue {
U32(u32),
U64(u64),

View file

@ -139,6 +139,11 @@ impl BatchBuilder {
self
}
pub fn quota(&mut self, bytes: i64) -> &mut Self {
self.ops.push(Operation::UpdateQuota { bytes });
self
}
pub fn custom(&mut self, value: impl IntoOperations) -> &mut Self {
value.build(self);
self

View file

@ -51,7 +51,7 @@ pub struct BatchBuilder {
pub ops: Vec<Operation>,
}
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum Operation {
AccountId {
account_id: u32,
@ -87,6 +87,9 @@ pub enum Operation {
key: Vec<u8>,
set: bool,
},
UpdateQuota {
bytes: i64,
},
Log {
change_id: u64,
collection: u8,

View file

@ -53,6 +53,7 @@ pub mod email_submission;
pub mod event_source;
pub mod mailbox;
pub mod push_subscription;
pub mod quota;
pub mod sieve_script;
pub mod stress_test;
pub mod thread_get;
@ -150,6 +151,11 @@ max-concurrent = 8
[jmap.protocol.upload]
max-size = 5000000
max-concurrent = 4
ttl = "1m"
[jmap.protocol.upload.quota]
files = 3
size = 50000
[jmap.rate-limit]
account.rate = "1000/1m"
@ -222,7 +228,7 @@ pub async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(delete).await;
email_query::test(params.server.clone(), &mut params.client, delete).await;
/*email_query::test(params.server.clone(), &mut params.client, delete).await;
email_get::test(params.server.clone(), &mut params.client).await;
email_set::test(params.server.clone(), &mut params.client).await;
email_parse::test(params.server.clone(), &mut params.client).await;
@ -243,6 +249,7 @@ pub async fn jmap_tests() {
vacation_response::test(params.server.clone(), &mut params.client).await;
email_submission::test(params.server.clone(), &mut params.client).await;
websocket::test(params.server.clone(), &mut params.client).await;
quota::test(params.server.clone(), &mut params.client).await;*/
stress_test::test(params.server.clone(), params.client).await;
if delete {

303
tests/src/jmap/quota.rs Normal file
View file

@ -0,0 +1,303 @@
use std::sync::Arc;
use jmap::{blob::upload::DISABLE_UPLOAD_QUOTA, mailbox::INBOX_ID, JMAP};
use jmap_client::{
client::Client,
core::set::{SetErrorType, SetObject},
email::EmailBodyPart,
};
use jmap_proto::types::{collection::Collection, id::Id};
use crate::{
directory::sql::{add_to_group, create_test_user_with_email, set_test_quota},
jmap::{delivery::SmtpConnection, mailbox::destroy_all_mailboxes, test_account_login},
};
pub async fn test(server: Arc<JMAP>, admin_client: &mut Client) {
println!("Running quota tests...");
let directory = server.directory.as_ref();
let other_account_id =
create_test_user_with_email(directory, "jdoe@example.com", "12345", "John Doe").await;
let account_id =
create_test_user_with_email(directory, "robert@example.com", "aabbcc", "Robert Foobar")
.await;
set_test_quota(directory, "robert@example.com", 1024).await;
add_to_group(directory, "robert@example.com", "jdoe@example.com").await;
// Delete temporary blobs from previous tests
server
.store
.delete_account_blobs(account_id.document_id())
.await
.unwrap();
server
.store
.delete_account_blobs(other_account_id.document_id())
.await
.unwrap();
// Test temporary blob quota (3 files)
DISABLE_UPLOAD_QUOTA.store(false, std::sync::atomic::Ordering::Relaxed);
let client = test_account_login("robert@example.com", "aabbcc").await;
for _ in 0..3 {
assert_eq!(
client
.upload(None, vec![b'A'; 1024], None)
.await
.unwrap()
.size(),
1024
);
}
match client
.upload(None, vec![b'A'; 1024], None)
.await
.unwrap_err()
{
jmap_client::Error::Problem(err) if err.detail().unwrap().contains("quota") => (),
other => panic!("Unexpected error: {:?}", other),
}
server
.store
.delete_account_blobs(account_id.document_id())
.await
.unwrap();
// Test temporary blob quota (50000 bytes)
for _ in 0..2 {
assert_eq!(
client
.upload(None, vec![b'A'; 25000], None)
.await
.unwrap()
.size(),
25000
);
}
match client
.upload(None, vec![b'A'; 1024], None)
.await
.unwrap_err()
{
jmap_client::Error::Problem(err) if err.detail().unwrap().contains("quota") => (),
other => panic!("Unexpected error: {:?}", other),
}
server
.store
.delete_account_blobs(account_id.document_id())
.await
.unwrap();
// Test Email/import quota
let inbox_id = Id::new(INBOX_ID as u64).to_string();
let mut message_ids = Vec::new();
for i in 0..2 {
message_ids.push(
client
.email_import(
create_message_with_size(
"jdoe@example.com",
"robert@example.com",
&format!("Test {i}"),
512,
),
vec![&inbox_id],
None::<Vec<String>>,
None,
)
.await
.unwrap()
.take_id(),
);
}
assert_over_quota(
client
.email_import(
create_message_with_size("test@example.com", "jdoe@example.com", "Test 3", 100),
vec![&inbox_id],
None::<Vec<String>>,
None,
)
.await,
);
// Delete messages and check available quota
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
assert_eq!(
server
.get_used_quota(account_id.document_id())
.await
.unwrap(),
0
);
// Test Email/set quota
let mut message_ids = Vec::new();
for i in 0..2 {
let mut request = client.build();
let create_item = request.set_email().create();
create_item
.mailbox_ids([&inbox_id])
.subject(format!("Test {i}"))
.from(["jdoe@example.com"])
.to(["robert@example.com"])
.body_value("a".to_string(), String::from_utf8(vec![b'A'; 200]).unwrap())
.text_body(EmailBodyPart::new().part_id("a"));
let create_id = create_item.create_id().unwrap();
message_ids.push(
request
.send_set_email()
.await
.unwrap()
.created(&create_id)
.unwrap()
.take_id(),
);
}
let mut request = client.build();
let create_item = request.set_email().create();
create_item
.mailbox_ids([&inbox_id])
.subject("Test 3")
.from(["jdoe@example.com"])
.to(["robert@example.com"])
.body_value("a".to_string(), String::from_utf8(vec![b'A'; 400]).unwrap())
.text_body(EmailBodyPart::new().part_id("a"));
let create_id = create_item.create_id().unwrap();
assert_over_quota(request.send_set_email().await.unwrap().created(&create_id));
// Delete messages and check available quota
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
assert_eq!(
server
.get_used_quota(account_id.document_id())
.await
.unwrap(),
0
);
// Test Email/copy quota
let other_client = test_account_login("jdoe@example.com", "12345").await;
let mut other_message_ids = Vec::new();
let mut message_ids = Vec::new();
for i in 0..3 {
other_message_ids.push(
other_client
.email_import(
create_message_with_size(
"jane@example.com",
"jdoe@example.com",
&format!("Other Test {i}"),
512,
),
vec![&inbox_id],
None::<Vec<String>>,
None,
)
.await
.unwrap()
.take_id(),
);
}
for id in other_message_ids.iter().take(2) {
message_ids.push(
client
.email_copy(
other_account_id.to_string(),
id,
vec![&inbox_id],
None::<Vec<String>>,
None,
)
.await
.unwrap()
.take_id(),
);
}
assert_over_quota(
client
.email_copy(
other_account_id.to_string(),
&other_message_ids[2],
vec![&inbox_id],
None::<Vec<String>>,
None,
)
.await,
);
// Delete messages and check available quota
for message_id in message_ids {
client.email_destroy(&message_id).await.unwrap();
}
assert_eq!(
server
.get_used_quota(account_id.document_id())
.await
.unwrap(),
0
);
// Test delivery quota
let mut lmtp = SmtpConnection::connect().await;
for i in 0..2 {
lmtp.ingest(
"jane@example.com",
&["robert@example.com"],
&String::from_utf8(create_message_with_size(
"jane@example.com",
"robert@example.com",
&format!("Ingest test {i}"),
100,
))
.unwrap(),
)
.await;
}
let quota = server
.get_used_quota(account_id.document_id())
.await
.unwrap();
assert!(quota > 0 && quota <= 1024, "Quota is {}", quota);
assert_eq!(
server
.get_document_ids(account_id.document_id(), Collection::Email)
.await
.unwrap()
.unwrap()
.len(),
1,
);
DISABLE_UPLOAD_QUOTA.store(true, std::sync::atomic::Ordering::Relaxed);
// Remove test data
for account_id in [&account_id, &other_account_id] {
admin_client.set_default_account_id(account_id.to_string());
destroy_all_mailboxes(admin_client).await;
}
server.store.assert_is_empty().await;
}
fn assert_over_quota<T: std::fmt::Debug>(result: Result<T, jmap_client::Error>) {
match result {
Ok(result) => panic!("Expected error, got {:?}", result),
Err(jmap_client::Error::Set(err)) if err.error() == &SetErrorType::OverQuota => (),
Err(err) => panic!("Expected OverQuota SetError, got {:?}", err),
}
}
fn create_message_with_size(from: &str, to: &str, subject: &str, size: usize) -> Vec<u8> {
let mut message = format!(
"From: {}\r\nTo: {}\r\nSubject: {}\r\n\r\n",
from, to, subject
);
for _ in 0..size - message.len() {
message.push('A');
}
message.into_bytes()
}

View file

@ -1,4 +1,4 @@
use store::{BlobKind, Store};
use store::{write::now, BlobKind, Store};
use utils::config::Config;
use crate::store::TempDir;
@ -34,7 +34,7 @@ path = "{TMP}"
const DATA: &[u8] = b"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Fusce erat nisl, dignissim a porttitor id, varius nec arcu. Sed mauris.";
#[tokio::test]
pub async fn blob_s3_test() {
pub async fn blob_tests() {
let temp_dir = TempDir::new("blob_tests", true);
test_blob(
Store::open(
@ -60,6 +60,12 @@ pub async fn blob_s3_test() {
}
async fn test_blob(store: Store) {
// Obtain temp quota
let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 100).await.unwrap();
assert_eq!(quota_items, 0);
assert_eq!(quota_bytes, 0);
store.purge_tmp_blobs(0).await.unwrap();
// Store and fetch
let kind = BlobKind::LinkedMaildir {
account_id: 0,
@ -104,32 +110,25 @@ async fn test_blob(store: Store) {
}
// Copy partial
let tmp_kind = BlobKind::Temporary {
account_id: 1,
creation_year: 2020,
creation_month: 12,
creation_day: 31,
seq: 0,
};
let tmp_kind2 = BlobKind::Temporary {
account_id: 1,
creation_year: 2021,
creation_month: 1,
creation_day: 1,
seq: 0,
};
assert!(store
.copy_blob(&src_kind, &tmp_kind, (0..11).into())
.await
.unwrap());
assert!(store
.copy_blob(&src_kind, &tmp_kind2, (0..11).into())
.await
.unwrap());
let now = now();
let mut tmp_kinds = Vec::new();
for i in 1..=3 {
let tmp_kind = BlobKind::Temporary {
account_id: 2,
timestamp: now - (i * 5),
seq: 0,
};
assert!(store
.copy_blob(&src_kind, &tmp_kind, (0..11).into())
.await
.unwrap());
tmp_kinds.push(tmp_kind);
}
assert_eq!(
String::from_utf8(
store
.get_blob(&tmp_kind, 0..u32::MAX)
.get_blob(&tmp_kinds[0], 0..u32::MAX)
.await
.unwrap()
.unwrap()
@ -138,15 +137,17 @@ async fn test_blob(store: Store) {
std::str::from_utf8(&DATA[0..11]).unwrap()
);
// Obtain temp quota
let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 100).await.unwrap();
assert_eq!(quota_items, 3);
assert_eq!(quota_bytes, 33);
let (quota_items, quota_bytes) = store.get_tmp_blob_usage(2, 12).await.unwrap();
assert_eq!(quota_items, 2);
assert_eq!(quota_bytes, 22);
// Delete range
store
.bulk_delete_blob(&BlobKind::LinkedMaildir {
account_id: 1,
document_id: 0,
})
.await
.unwrap();
store.bulk_delete_blob(&tmp_kind).await.unwrap();
store.delete_account_blobs(1).await.unwrap();
store.purge_tmp_blobs(7).await.unwrap();
// Make sure the blobs are deleted
for id in 0..4 {
@ -162,11 +163,13 @@ async fn test_blob(store: Store) {
.unwrap()
.is_none());
}
assert!(store
.get_blob(&tmp_kind, 0..u32::MAX)
.await
.unwrap()
.is_none());
for i in [1, 2] {
assert!(store
.get_blob(&tmp_kinds[i], 0..u32::MAX)
.await
.unwrap()
.is_none());
}
// Make sure other blobs were not deleted
assert!(store
@ -175,23 +178,26 @@ async fn test_blob(store: Store) {
.unwrap()
.is_some());
assert!(store
.get_blob(&tmp_kind2, 0..u32::MAX)
.get_blob(&tmp_kinds[0], 0..u32::MAX)
.await
.unwrap()
.is_some());
// Copying a non-existing blob should fail
assert!(!store.copy_blob(&tmp_kind, &src_kind, None).await.unwrap());
assert!(!store
.copy_blob(&tmp_kinds[1], &src_kind, None)
.await
.unwrap());
// Copy blob between buckets
assert!(store
.copy_blob(&src_kind, &tmp_kind, (10..20).into())
.copy_blob(&src_kind, &tmp_kinds[0], (10..20).into())
.await
.unwrap());
assert_eq!(
String::from_utf8(
store
.get_blob(&tmp_kind, 0..u32::MAX)
.get_blob(&tmp_kinds[0], 0..u32::MAX)
.await
.unwrap()
.unwrap()
@ -201,7 +207,7 @@ async fn test_blob(store: Store) {
);
// Delete blobs
for blob_kind in [src_kind, tmp_kind, tmp_kind2] {
for blob_kind in [src_kind, tmp_kinds[0]] {
assert!(store.delete_blob(&blob_kind).await.unwrap());
assert!(store
.get_blob(&blob_kind, 0..u32::MAX)