Removed local concurrency limiters, switch to global rate limiting

This commit is contained in:
mdecimus 2025-01-18 19:09:02 +01:00
parent e7c6be45d8
commit 8438435fbe
54 changed files with 439 additions and 11477 deletions

19
Cargo.lock generated
View file

@ -1212,7 +1212,6 @@ dependencies = [
"bincode",
"biscuit",
"chrono",
"dashmap",
"decancer",
"directory",
"dns-update",
@ -1629,20 +1628,6 @@ version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728"
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
@ -3254,7 +3239,6 @@ version = "0.11.2"
dependencies = [
"ahash 0.8.11",
"common",
"dashmap",
"directory",
"email",
"imap_proto",
@ -3510,7 +3494,6 @@ dependencies = [
"bincode",
"chrono",
"common",
"dashmap",
"directory",
"email",
"form-data",
@ -6459,7 +6442,6 @@ dependencies = [
"blake3",
"chrono",
"common",
"dashmap",
"directory",
"email",
"form_urlencoded",
@ -6846,7 +6828,6 @@ dependencies = [
"chrono",
"common",
"csv",
"dashmap",
"directory",
"ece",
"email",

View file

@ -60,7 +60,6 @@ zip = "2.1"
pwhash = "1.0.0"
xxhash-rust = { version = "0.8.5", features = ["xxh3"] }
psl = "2"
dashmap = "6.0"
aes-gcm-siv = "0.11.1"
biscuit = "0.7.0"
rsa = "0.9.2"

View file

@ -28,7 +28,10 @@ use utils::map::{
vec_map::VecMap,
};
use crate::{Server, KV_TOKEN_REVISION};
use crate::{
listener::limiter::{ConcurrencyLimiter, LimiterResult},
Server, KV_TOKEN_REVISION,
};
use super::{roles::RolePermissions, AccessToken, ResourceToken, TenantInfo};
@ -122,6 +125,17 @@ impl Server {
.unwrap_or_default(),
quota: principal.quota(),
permissions,
concurrent_imap_requests: self.core.imap.rate_concurrent.map(ConcurrencyLimiter::new),
concurrent_http_requests: self
.core
.jmap
.request_max_concurrent
.map(ConcurrencyLimiter::new),
concurrent_uploads: self
.core
.jmap
.upload_max_concurrent
.map(ConcurrencyLimiter::new),
obj_size: 0,
revision,
};
@ -647,6 +661,24 @@ impl AccessToken {
}
}
pub fn is_http_request_allowed(&self) -> LimiterResult {
self.concurrent_http_requests
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}
pub fn is_imap_request_allowed(&self) -> LimiterResult {
self.concurrent_imap_requests
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}
pub fn is_upload_allowed(&self) -> LimiterResult {
self.concurrent_uploads
.as_ref()
.map_or(LimiterResult::Disabled, |limiter| limiter.is_allowed())
}
pub fn update_size(mut self) -> Self {
self.obj_size = (std::mem::size_of::<AccessToken>()
+ (self.member_of.len() * std::mem::size_of::<u32>())

View file

@ -17,14 +17,14 @@ use utils::{
map::{bitmap::Bitmap, vec_map::VecMap},
};
use crate::Server;
use crate::{listener::limiter::ConcurrencyLimiter, Server};
pub mod access_token;
pub mod oauth;
pub mod roles;
pub mod sasl;
#[derive(Debug, Clone, Default)]
#[derive(Debug, Default)]
pub struct AccessToken {
pub primary_id: u32,
pub member_of: Vec<u32>,
@ -35,6 +35,9 @@ pub struct AccessToken {
pub quota: u64,
pub permissions: Permissions,
pub tenant: Option<TenantInfo>,
pub concurrent_http_requests: Option<ConcurrencyLimiter>,
pub concurrent_imap_requests: Option<ConcurrencyLimiter>,
pub concurrent_uploads: Option<ConcurrencyLimiter>,
pub revision: u64,
pub obj_size: u64,
}

View file

@ -9,9 +9,8 @@ use std::{
sync::Arc,
};
use ahash::{AHashMap, AHashSet, RandomState};
use ahash::{AHashMap, AHashSet};
use arc_swap::ArcSwap;
use dashmap::DashMap;
use mail_auth::{Parameters, Txt, MX};
use mail_send::smtp::tls::build_tls_connector;
use nlp::bayes::{TokenHash, Weights};
@ -28,7 +27,7 @@ use crate::{
listener::blocked::BlockedIps,
manager::webadmin::WebAdminManager,
Account, AccountId, Caches, Data, Mailbox, MailboxId, MailboxState, NextMailboxState, Threads,
ThrottleKeyHasherBuilder, TlsConnectors,
TlsConnectors,
};
use super::server::tls::{build_self_signed_cert, parse_certificates};
@ -43,13 +42,6 @@ impl Data {
subject_names.insert("localhost".to_string());
}
// Parse capacities
let shard_amount = config
.property::<u64>("limiter.shard")
.unwrap_or_else(|| (num_cpus::get() * 2) as u64)
.next_power_of_two() as usize;
let capacity = config.property("limiter.capacity").unwrap_or(100);
// Parse id generator
let id_generator = config
.property::<u64>("cluster.node-id")
@ -78,27 +70,7 @@ impl Data {
.map(|path| WebAdminManager::new(path.into()))
.unwrap_or_default(),
config_version: 0.into(),
jmap_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
RandomState::default(),
shard_amount,
),
imap_limiter: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
RandomState::default(),
shard_amount,
),
logos: Default::default(),
smtp_session_throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
ThrottleKeyHasherBuilder::default(),
shard_amount,
),
smtp_queue_throttle: DashMap::with_capacity_and_hasher_and_shard_amount(
capacity,
ThrottleKeyHasherBuilder::default(),
shard_amount,
),
smtp_connectors: TlsConnectors::default(),
asn_geo_data: Default::default(),
}
@ -248,11 +220,7 @@ impl Default for Data {
queue_status: true.into(),
webadmin: Default::default(),
config_version: Default::default(),
jmap_limiter: Default::default(),
imap_limiter: Default::default(),
logos: Default::default(),
smtp_session_throttle: Default::default(),
smtp_queue_throttle: Default::default(),
smtp_connectors: Default::default(),
asn_geo_data: Default::default(),
}

View file

@ -24,9 +24,11 @@ impl JmapConfig {
Capability::Core,
Capabilities::Core(CoreCapabilities {
max_size_upload: self.upload_max_size,
max_concurrent_upload: self.upload_max_concurrent as usize,
max_concurrent_upload: self.upload_max_concurrent.unwrap_or(u32::MAX as u64)
as usize,
max_size_request: self.request_max_size,
max_concurrent_requests: self.request_max_concurrent as usize,
max_concurrent_requests: self.request_max_concurrent.unwrap_or(u32::MAX as u64)
as usize,
max_calls_in_request: self.request_max_calls,
max_objects_in_get: self.get_max_objects,
max_objects_in_set: self.set_max_objects,

View file

@ -21,13 +21,13 @@ pub struct JmapConfig {
pub request_max_size: usize,
pub request_max_calls: usize,
pub request_max_concurrent: u64,
pub request_max_concurrent: Option<u64>,
pub get_max_objects: usize,
pub set_max_objects: usize,
pub upload_max_size: usize,
pub upload_max_concurrent: u64,
pub upload_max_concurrent: Option<u64>,
pub upload_tmp_quota_size: usize,
pub upload_tmp_quota_amount: usize,
@ -72,7 +72,6 @@ pub struct JmapConfig {
pub encrypt_append: bool,
pub capabilities: BaseCapabilities,
pub session_purge_frequency: SimpleCron,
pub account_purge_frequency: SimpleCron,
}
@ -254,8 +253,8 @@ impl JmapConfig {
.property("jmap.protocol.request.max-calls")
.unwrap_or(16),
request_max_concurrent: config
.property("jmap.protocol.request.max-concurrent")
.unwrap_or(4),
.property_or_default::<Option<u64>>("jmap.protocol.request.max-concurrent", "4")
.unwrap_or(Some(4)),
get_max_objects: config
.property("jmap.protocol.get.max-objects")
.unwrap_or(500),
@ -266,8 +265,8 @@ impl JmapConfig {
.property("jmap.protocol.upload.max-size")
.unwrap_or(50000000),
upload_max_concurrent: config
.property("jmap.protocol.upload.max-concurrent")
.unwrap_or(4),
.property_or_default::<Option<u64>>("jmap.protocol.upload.max-concurrent", "4")
.unwrap_or(Some(4)),
upload_tmp_quota_size: config
.property("jmap.protocol.upload.quota.size")
.unwrap_or(50000000),
@ -346,9 +345,6 @@ impl JmapConfig {
push_throttle: config
.property_or_default("jmap.push.throttle", "1s")
.unwrap_or_else(|| Duration::from_secs(1)),
session_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.session.purge.frequency", "15 * *")
.unwrap_or_else(|| SimpleCron::parse_value("15 * *").unwrap()),
account_purge_frequency: config
.property_or_default::<SimpleCron>("jmap.account.purge.frequency", "0 0 *")
.unwrap_or_else(|| SimpleCron::parse_value("0 0 *").unwrap()),

View file

@ -33,12 +33,11 @@ pub struct SmtpConfig {
#[derive(Debug, Default, Clone)]
#[cfg_attr(feature = "test_mode", derive(PartialEq, Eq))]
pub struct Throttle {
pub struct QueueRateLimiter {
pub id: String,
pub expr: Expression,
pub keys: u16,
pub concurrency: Option<u64>,
pub rate: Option<Rate>,
pub rate: Rate,
}
pub const THROTTLE_RCPT: u16 = 1 << 0;

View file

@ -7,17 +7,15 @@
use ahash::AHashMap;
use mail_auth::IpLookupStrategy;
use mail_send::Credentials;
use utils::config::{
utils::{AsKey, ParseValue},
Config,
};
use throttle::parse_queue_rate_limiter_key;
use utils::config::{utils::ParseValue, Config};
use crate::{
config::server::ServerProtocol,
expr::{if_block::IfBlock, *},
};
use self::throttle::{parse_throttle, parse_throttle_key};
use self::throttle::parse_queue_rate_limiter;
use super::*;
@ -41,9 +39,11 @@ pub struct QueueConfig {
// Timeouts
pub timeout: QueueOutboundTimeout,
// Throttle and Quotas
pub throttle: QueueThrottle,
// Rate limits
pub inbound_limiters: QueueRateLimiters,
pub outbound_limiters: QueueRateLimiters,
pub quota: QueueQuotas,
pub max_threads: usize,
// Relay hosts
pub relay_hosts: AHashMap<String, RelayHost>,
@ -82,15 +82,14 @@ pub struct QueueOutboundTimeout {
pub mta_sts: IfBlock,
}
#[derive(Debug, Clone)]
pub struct QueueThrottle {
pub outbound_concurrency: usize,
pub sender: Vec<Throttle>,
pub rcpt: Vec<Throttle>,
pub host: Vec<Throttle>,
#[derive(Debug, Clone, Default)]
pub struct QueueRateLimiters {
pub sender: Vec<QueueRateLimiter>,
pub rcpt: Vec<QueueRateLimiter>,
pub remote: Vec<QueueRateLimiter>,
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct QueueQuotas {
pub sender: Vec<QueueQuota>,
pub rcpt: Vec<QueueQuota>,
@ -201,17 +200,10 @@ impl Default for QueueConfig {
data: IfBlock::new::<()>("queue.outbound.timeouts.data", [], "10m"),
mta_sts: IfBlock::new::<()>("queue.outbound.timeouts.mta-sts", [], "10m"),
},
throttle: QueueThrottle {
outbound_concurrency: 25,
sender: Default::default(),
rcpt: Default::default(),
host: Default::default(),
},
quota: QueueQuotas {
sender: Default::default(),
rcpt: Default::default(),
rcpt_domain: Default::default(),
},
max_threads: 25,
inbound_limiters: QueueRateLimiters::default(),
outbound_limiters: QueueRateLimiters::default(),
quota: QueueQuotas::default(),
relay_hosts: Default::default(),
}
}
@ -324,8 +316,13 @@ impl QueueConfig {
}
}
// Parse queue quotas and throttles
queue.throttle = parse_queue_throttle(config);
// Parse rate limiters
queue.max_threads = config
.property_or_default::<usize>("queue.threads.remote", "25")
.unwrap_or(25)
.max(1);
queue.inbound_limiters = parse_inbound_rate_limters(config);
queue.outbound_limiters = parse_outbound_rate_limiters(config);
queue.quota = parse_queue_quota(config);
// Parse relay hosts
@ -380,21 +377,60 @@ fn parse_relay_host(config: &mut Config, id: &str) -> Option<RelayHost> {
})
}
fn parse_queue_throttle(config: &mut Config) -> QueueThrottle {
// Parse throttle
let mut throttle = QueueThrottle {
sender: Vec::new(),
rcpt: Vec::new(),
host: Vec::new(),
outbound_concurrency: config
.property_or_default::<usize>("queue.threads.remote", "25")
.unwrap_or(25)
.max(1),
};
let all_throttles = parse_throttle(
fn parse_inbound_rate_limters(config: &mut Config) -> QueueRateLimiters {
let mut throttle = QueueRateLimiters::default();
let all_throttles = parse_queue_rate_limiter(
config,
"queue.throttle",
"queue.limiter.inbound",
&TokenMap::default().with_variables(SMTP_RCPT_TO_VARS),
THROTTLE_LISTENER
| THROTTLE_REMOTE_IP
| THROTTLE_LOCAL_IP
| THROTTLE_AUTH_AS
| THROTTLE_HELO_DOMAIN
| THROTTLE_RCPT
| THROTTLE_RCPT_DOMAIN
| THROTTLE_SENDER
| THROTTLE_SENDER_DOMAIN,
);
for t in all_throttles {
if (t.keys & (THROTTLE_RCPT | THROTTLE_RCPT_DOMAIN)) != 0
|| t.expr.items().iter().any(|c| {
matches!(
c,
ExpressionItem::Variable(V_RECIPIENT | V_RECIPIENT_DOMAIN)
)
})
{
throttle.rcpt.push(t);
} else if (t.keys
& (THROTTLE_SENDER | THROTTLE_SENDER_DOMAIN | THROTTLE_HELO_DOMAIN | THROTTLE_AUTH_AS))
!= 0
|| t.expr.items().iter().any(|c| {
matches!(
c,
ExpressionItem::Variable(
V_SENDER | V_SENDER_DOMAIN | V_HELO_DOMAIN | V_AUTHENTICATED_AS
)
)
})
{
throttle.sender.push(t);
} else {
throttle.remote.push(t);
}
}
throttle
}
fn parse_outbound_rate_limiters(config: &mut Config) -> QueueRateLimiters {
// Parse throttle
let mut throttle = QueueRateLimiters::default();
let all_throttles = parse_queue_rate_limiter(
config,
"queue.limiter.outbound",
&TokenMap::default().with_variables(SMTP_QUEUE_HOST_VARS),
THROTTLE_RCPT_DOMAIN
| THROTTLE_SENDER
@ -410,7 +446,7 @@ fn parse_queue_throttle(config: &mut Config) -> QueueThrottle {
.iter()
.any(|c| matches!(c, ExpressionItem::Variable(V_MX | V_REMOTE_IP | V_LOCAL_IP)))
{
throttle.host.push(t);
throttle.remote.push(t);
} else if (t.keys & (THROTTLE_RCPT_DOMAIN)) != 0
|| t.expr
.items()
@ -481,7 +517,7 @@ fn parse_queue_quota_item(config: &mut Config, prefix: impl AsKey, id: &str) ->
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<Vec<_>>()
{
match parse_throttle_key(&value) {
match parse_queue_rate_limiter_key(&value) {
Ok(key) => {
if (key
& (THROTTLE_RCPT_DOMAIN

View file

@ -24,7 +24,7 @@ use crate::{
expr::{if_block::IfBlock, tokenizer::TokenMap, *},
};
use self::{resolver::Policy, throttle::parse_throttle};
use self::resolver::Policy;
use super::*;
@ -33,7 +33,6 @@ pub struct SessionConfig {
pub timeout: IfBlock,
pub duration: IfBlock,
pub transfer_limit: IfBlock,
pub throttle: SessionThrottle,
pub connect: Connect,
pub ehlo: Ehlo,
@ -48,13 +47,6 @@ pub struct SessionConfig {
pub hooks: Vec<MTAHook>,
}
#[derive(Default, Debug, Clone)]
pub struct SessionThrottle {
pub connect: Vec<Throttle>,
pub mail_from: Vec<Throttle>,
pub rcpt_to: Vec<Throttle>,
}
#[derive(Clone)]
pub struct Connect {
pub hostname: IfBlock,
@ -222,7 +214,6 @@ impl SessionConfig {
.into_iter()
.filter_map(|id| parse_hooks(config, &id, &has_rcpt_vars))
.collect();
session.throttle = SessionThrottle::parse(config);
session.mta_sts_policy = Policy::try_parse(config);
for (value, key, token_map) in [
@ -460,58 +451,6 @@ impl SessionConfig {
}
}
impl SessionThrottle {
pub fn parse(config: &mut Config) -> Self {
let mut throttle = SessionThrottle::default();
let all_throttles = parse_throttle(
config,
"session.throttle",
&TokenMap::default().with_variables(SMTP_RCPT_TO_VARS),
THROTTLE_LISTENER
| THROTTLE_REMOTE_IP
| THROTTLE_LOCAL_IP
| THROTTLE_AUTH_AS
| THROTTLE_HELO_DOMAIN
| THROTTLE_RCPT
| THROTTLE_RCPT_DOMAIN
| THROTTLE_SENDER
| THROTTLE_SENDER_DOMAIN,
);
for t in all_throttles {
if (t.keys & (THROTTLE_RCPT | THROTTLE_RCPT_DOMAIN)) != 0
|| t.expr.items().iter().any(|c| {
matches!(
c,
ExpressionItem::Variable(V_RECIPIENT | V_RECIPIENT_DOMAIN)
)
})
{
throttle.rcpt_to.push(t);
} else if (t.keys
& (THROTTLE_SENDER
| THROTTLE_SENDER_DOMAIN
| THROTTLE_HELO_DOMAIN
| THROTTLE_AUTH_AS))
!= 0
|| t.expr.items().iter().any(|c| {
matches!(
c,
ExpressionItem::Variable(
V_SENDER | V_SENDER_DOMAIN | V_HELO_DOMAIN | V_AUTHENTICATED_AS
)
)
})
{
throttle.mail_from.push(t);
} else {
throttle.connect.push(t);
}
}
throttle
}
}
fn parse_milter(config: &mut Config, id: &str, token_map: &TokenMap) -> Option<Milter> {
let hostname = config
.value_require(("session.milter", id, "hostname"))?
@ -693,11 +632,6 @@ impl Default for SessionConfig {
timeout: IfBlock::new::<()>("session.timeout", [], "5m"),
duration: IfBlock::new::<()>("session.duration", [], "10m"),
transfer_limit: IfBlock::new::<()>("session.transfer-limit", [], "262144000"),
throttle: SessionThrottle {
connect: Default::default(),
mail_from: Default::default(),
rcpt_to: Default::default(),
},
connect: Connect {
hostname: IfBlock::new::<()>(
"server.connect.hostname",

View file

@ -10,44 +10,44 @@ use crate::expr::{tokenizer::TokenMap, Expression};
use super::*;
pub fn parse_throttle(
pub fn parse_queue_rate_limiter(
config: &mut Config,
prefix: impl AsKey,
token_map: &TokenMap,
available_throttle_keys: u16,
) -> Vec<Throttle> {
available_rate_limiter_keys: u16,
) -> Vec<QueueRateLimiter> {
let prefix_ = prefix.as_key();
let mut throttles = Vec::new();
for throttle_id in config
let mut rate_limiters = Vec::new();
for rate_limiter_id in config
.sub_keys(prefix, "")
.map(|s| s.to_string())
.collect::<Vec<_>>()
{
let throttle_id = throttle_id.as_str();
if let Some(throttle) = parse_throttle_item(
let rate_limiter_id = rate_limiter_id.as_str();
if let Some(rate_limiter) = parse_queue_rate_limiter_item(
config,
(&prefix_, throttle_id),
throttle_id,
(&prefix_, rate_limiter_id),
rate_limiter_id,
token_map,
available_throttle_keys,
available_rate_limiter_keys,
) {
throttles.push(throttle);
rate_limiters.push(rate_limiter);
}
}
throttles
rate_limiters
}
fn parse_throttle_item(
fn parse_queue_rate_limiter_item(
config: &mut Config,
prefix: impl AsKey,
throttle_id: &str,
rate_limiter_id: &str,
token_map: &TokenMap,
available_throttle_keys: u16,
) -> Option<Throttle> {
available_rate_limiter_keys: u16,
) -> Option<QueueRateLimiter> {
let prefix = prefix.as_key();
// Skip disabled throttles
// Skip disabled rate_limiters
if !config
.property::<bool>((prefix.as_str(), "enable"))
.unwrap_or(true)
@ -61,12 +61,13 @@ fn parse_throttle_item(
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<Vec<_>>()
{
match parse_throttle_key(&value) {
match parse_queue_rate_limiter_key(&value) {
Ok(key) => {
if (key & available_throttle_keys) != 0 {
if (key & available_rate_limiter_keys) != 0 {
keys |= key;
} else {
let err = format!("Throttle key {value:?} is not available in this context");
let err =
format!("Rate limiter key {value:?} is not available in this context");
config.new_build_error(key_, err);
}
}
@ -76,38 +77,18 @@ fn parse_throttle_item(
}
}
let throttle = Throttle {
id: throttle_id.to_string(),
Some(QueueRateLimiter {
id: rate_limiter_id.to_string(),
expr: Expression::try_parse(config, (prefix.as_str(), "match"), token_map)
.unwrap_or_default(),
keys,
concurrency: config
.property::<Option<u64>>((prefix.as_str(), "concurrency"))
.filter(|&v| v.as_ref().is_some_and(|v| *v > 0))
.unwrap_or_default(),
rate: config
.property::<Option<Rate>>((prefix.as_str(), "rate"))
.filter(|v| v.as_ref().is_some_and(|r| r.requests > 0))
.unwrap_or_default(),
};
// Validate
if throttle.rate.is_none() && throttle.concurrency.is_none() {
config.new_parse_error(
prefix.as_str(),
concat!(
"Throttle needs to define a ",
"valid 'rate' and/or 'concurrency' property."
)
.to_string(),
);
None
} else {
Some(throttle)
}
.property_require::<Rate>((prefix.as_str(), "rate"))
.filter(|r| r.requests > 0)?,
})
}
pub(crate) fn parse_throttle_key(value: &str) -> Result<u16, String> {
pub(crate) fn parse_queue_rate_limiter_key(value: &str) -> Result<u16, String> {
match value {
"rcpt" => Ok(THROTTLE_RCPT),
"rcpt_domain" => Ok(THROTTLE_RCPT_DOMAIN),
@ -119,6 +100,6 @@ pub(crate) fn parse_throttle_key(value: &str) -> Result<u16, String> {
"remote_ip" => Ok(THROTTLE_REMOTE_IP),
"local_ip" => Ok(THROTTLE_LOCAL_IP),
"helo_domain" => Ok(THROTTLE_HELO_DOMAIN),
_ => Err(format!("Invalid throttle key {value:?}")),
_ => Err(format!("Invalid THROTTLE key {value:?}")),
}
}

View file

@ -17,12 +17,9 @@ use store::{BlobStore, InMemoryStore, Store};
use tokio::sync::mpsc;
use utils::map::bitmap::Bitmap;
use crate::{
config::smtp::{
report::AggregateFrequency,
resolver::{Policy, Tlsa},
},
listener::limiter::ConcurrencyLimiter,
use crate::config::smtp::{
report::AggregateFrequency,
resolver::{Policy, Tlsa},
};
pub enum HousekeeperEvent {
@ -108,22 +105,10 @@ pub enum QueueEvent {
#[derive(Debug)]
pub enum QueueEventStatus {
Completed,
Locked {
until: u64,
},
Limited {
limiters: Vec<ConcurrencyLimiter>,
next_due: Option<u64>,
},
Locked { until: u64 },
Deferred,
}
#[derive(Debug, Clone, Copy)]
pub struct QueuedMessage {
pub due: u64,
pub queue_id: u64,
}
#[derive(Debug)]
pub enum ReportingEvent {
Dmarc(Box<DmarcEvent>),

View file

@ -14,7 +14,7 @@ use std::{
},
};
use ahash::{AHashMap, AHashSet, RandomState};
use ahash::{AHashMap, AHashSet};
use arc_swap::ArcSwap;
use auth::{oauth::config::OAuthConfig, roles::RolePermissions, AccessToken};
use config::{
@ -30,13 +30,10 @@ use config::{
storage::Storage,
telemetry::Metrics,
};
use dashmap::DashMap;
use imap_proto::protocol::list::Attribute;
use ipc::{HousekeeperEvent, QueueEvent, ReportingEvent, StateEvent};
use listener::{
asn::AsnGeoLookupData, blocked::Security, limiter::ConcurrencyLimiter, tls::AcmeProviders,
};
use listener::{asn::AsnGeoLookupData, blocked::Security, tls::AcmeProviders};
use mail_auth::{Txt, MX};
use manager::webadmin::{Resource, WebAdminManager};
@ -125,15 +122,9 @@ pub struct Data {
pub queue_status: AtomicBool,
pub webadmin: WebAdminManager,
pub logos: Mutex<AHashMap<String, Option<Resource<Vec<u8>>>>>,
pub config_version: AtomicU8,
pub jmap_limiter: DashMap<u32, Arc<ConcurrencyLimiters>, RandomState>,
pub imap_limiter: DashMap<u32, Arc<ConcurrencyLimiters>, RandomState>,
pub logos: Mutex<AHashMap<String, Option<Resource<Vec<u8>>>>>,
pub smtp_session_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>,
pub smtp_queue_throttle: DashMap<ThrottleKey, ConcurrencyLimiter, ThrottleKeyHasherBuilder>,
pub smtp_connectors: TlsConnectors,
}
@ -245,11 +236,6 @@ pub struct Threads {
pub modseq: Option<u64>,
}
pub struct ConcurrencyLimiters {
pub concurrent_requests: ConcurrencyLimiter,
pub concurrent_uploads: ConcurrencyLimiter,
}
#[derive(Clone, Default)]
pub struct Core {
pub storage: Storage,
@ -382,12 +368,6 @@ impl BuildHasher for ThrottleKeyHasherBuilder {
}
}
impl ConcurrencyLimiters {
pub fn is_active(&self) -> bool {
self.concurrent_requests.is_active() || self.concurrent_uploads.is_active()
}
}
#[cfg(feature = "test_mode")]
#[allow(clippy::derivable_impls)]
impl Default for Server {

View file

@ -4,22 +4,11 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::SystemTime,
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc,
};
use utils::config::Rate;
#[derive(Debug)]
pub struct RateLimiter {
next_refill: AtomicU64,
used_tokens: AtomicU64,
}
#[derive(Debug, Clone)]
pub struct ConcurrencyLimiter {
pub max_concurrent: u64,
@ -31,53 +20,18 @@ pub struct InFlight {
concurrent: Arc<AtomicU64>,
}
pub enum LimiterResult {
Allowed(InFlight),
Forbidden,
Disabled,
}
impl Drop for InFlight {
fn drop(&mut self) {
self.concurrent.fetch_sub(1, Ordering::Relaxed);
}
}
impl RateLimiter {
pub fn new(rate: &Rate) -> Self {
RateLimiter {
next_refill: (now() + rate.period.as_secs()).into(),
used_tokens: 0.into(),
}
}
pub fn is_allowed(&self, rate: &Rate) -> bool {
// Check rate limit
if self.used_tokens.fetch_add(1, Ordering::Relaxed) < rate.requests {
true
} else {
let now = now();
if self.next_refill.load(Ordering::Relaxed) <= now {
self.next_refill
.store(now + rate.period.as_secs(), Ordering::Relaxed);
self.used_tokens.store(1, Ordering::Relaxed);
true
} else {
false
}
}
}
pub fn is_allowed_soft(&self, rate: &Rate) -> bool {
self.used_tokens.load(Ordering::Relaxed) < rate.requests
|| self.next_refill.load(Ordering::Relaxed) <= now()
}
pub fn secs_to_refill(&self) -> u64 {
self.next_refill
.load(Ordering::Relaxed)
.saturating_sub(now())
}
pub fn is_active(&self) -> bool {
self.next_refill.load(Ordering::Relaxed) > now()
}
}
impl ConcurrencyLimiter {
pub fn new(max_concurrent: u64) -> Self {
ConcurrencyLimiter {
@ -86,15 +40,15 @@ impl ConcurrencyLimiter {
}
}
pub fn is_allowed(&self) -> Option<InFlight> {
pub fn is_allowed(&self) -> LimiterResult {
if self.concurrent.load(Ordering::Relaxed) < self.max_concurrent {
// Return in-flight request
self.concurrent.fetch_add(1, Ordering::Relaxed);
Some(InFlight {
LimiterResult::Allowed(InFlight {
concurrent: self.concurrent.clone(),
})
} else {
None
LimiterResult::Forbidden
}
}
@ -113,9 +67,12 @@ impl InFlight {
}
}
fn now() -> u64 {
SystemTime::UNIX_EPOCH
.elapsed()
.unwrap_or_default()
.as_secs()
impl From<LimiterResult> for Option<InFlight> {
fn from(result: LimiterResult) -> Self {
match result {
LimiterResult::Allowed(in_flight) => Some(in_flight),
LimiterResult::Forbidden => None,
LimiterResult::Disabled => Some(InFlight::default()),
}
}
}

View file

@ -24,8 +24,8 @@ use crate::{
};
use super::{
limiter::ConcurrencyLimiter, ServerInstance, SessionData, SessionManager, SessionStream,
TcpAcceptor,
limiter::{ConcurrencyLimiter, LimiterResult},
ServerInstance, SessionData, SessionManager, SessionStream, TcpAcceptor,
};
impl Listener {
@ -234,7 +234,7 @@ impl BuildSession for Arc<ServerInstance> {
RemotePort = remote_port,
);
None
} else if let Some(in_flight) = self.limiter.is_allowed() {
} else if let LimiterResult::Allowed(in_flight) = self.limiter.is_allowed() {
// Enforce concurrency
SessionData {
stream,

View file

@ -292,16 +292,13 @@ impl BootManager {
("queue.quota.size.messages", "100000"),
("queue.quota.size.size", "10737418240"),
("queue.quota.size.enable", "true"),
("queue.throttle.rcpt.key", "rcpt_domain"),
("queue.throttle.rcpt.concurrency", "5"),
("queue.throttle.rcpt.enable", "true"),
("session.throttle.ip.key", "remote_ip"),
("session.throttle.ip.concurrency", "5"),
("session.throttle.ip.enable", "true"),
("session.throttle.sender.key.0", "sender_domain"),
("session.throttle.sender.key.1", "rcpt"),
("session.throttle.sender.rate", "25/1h"),
("session.throttle.sender.enable", "true"),
("queue.limiter.inbound.ip.key", "remote_ip"),
("queue.limiter.inbound.ip.rate", "5/1s"),
("queue.limiter.inbound.ip.enable", "true"),
("queue.limiter.inbound.sender.key.0", "sender_domain"),
("queue.limiter.inbound.sender.key.1", "rcpt"),
("queue.limiter.inbound.sender.rate", "25/1h"),
("queue.limiter.inbound.sender.enable", "true"),
("report.analysis.addresses", "postmaster@*"),
] {
insert_keys.push(ConfigKey::from(key));

View file

@ -24,7 +24,6 @@ tokio-rustls = { version = "0.26", default-features = false, features = ["ring",
parking_lot = "0.12"
ahash = { version = "0.8" }
md5 = "0.7.0"
dashmap = "6.0"
rand = "0.8.5"

View file

@ -7,8 +7,8 @@
use std::{iter::Peekable, sync::Arc, vec::IntoIter};
use common::{
listener::{limiter::ConcurrencyLimiter, SessionResult, SessionStream},
ConcurrencyLimiters, KV_RATE_LIMIT_IMAP,
listener::{SessionResult, SessionStream},
KV_RATE_LIMIT_IMAP,
};
use imap_proto::{
receiver::{self, Request},
@ -419,29 +419,6 @@ impl<T: SessionStream> Session<T> {
},
}
}
pub fn get_concurrency_limiter(&self, account_id: u32) -> Option<Arc<ConcurrencyLimiters>> {
let rate = self.server.core.imap.rate_concurrent?;
self.server
.inner
.data
.imap_limiter
.get(&account_id)
.map(|limiter| limiter.clone())
.unwrap_or_else(|| {
let limiter = Arc::new(ConcurrencyLimiters {
concurrent_requests: ConcurrencyLimiter::new(rate),
concurrent_uploads: ConcurrencyLimiter::new(rate),
});
self.server
.inner
.data
.imap_limiter
.insert(account_id, limiter.clone());
limiter
})
.into()
}
}
impl<T: SessionStream> State<T> {

View file

@ -9,7 +9,7 @@ use common::{
sasl::{sasl_decode_challenge_oauth, sasl_decode_challenge_plain},
AuthRequest,
},
listener::SessionStream,
listener::{limiter::LimiterResult, SessionStream},
};
use directory::Permission;
use imap_proto::{
@ -105,17 +105,14 @@ impl<T: SessionStream> Session<T> {
})?;
// Enforce concurrency limits
let in_flight = match self
.get_concurrency_limiter(access_token.primary_id())
.map(|limiter| limiter.concurrent_requests.is_allowed())
{
Some(Some(limiter)) => Some(limiter),
None => None,
Some(None) => {
let in_flight = match access_token.is_imap_request_allowed() {
LimiterResult::Allowed(in_flight) => Some(in_flight),
LimiterResult::Forbidden => {
return Err(trc::LimitEvent::ConcurrentRequest
.into_err()
.id(tag.clone()));
.id(tag.clone()))
}
LimiterResult::Disabled => None,
};
// Create session

View file

@ -42,7 +42,6 @@ reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-
tokio-tungstenite = "0.26"
tungstenite = "0.26"
chrono = "0.4"
dashmap = "6.0"
rand = "0.8.5"
pkcs8 = { version = "0.10.2", features = ["alloc", "std"] }
lz4_flex = { version = "0.11", default-features = false }

View file

@ -24,7 +24,7 @@ pub trait Authenticator: Sync + Send {
req: &HttpRequest,
session: &HttpSessionData,
allow_api_access: bool,
) -> impl Future<Output = trc::Result<(InFlight, Arc<AccessToken>)>> + Send;
) -> impl Future<Output = trc::Result<(Option<InFlight>, Arc<AccessToken>)>> + Send;
}
impl Authenticator for Server {
@ -33,7 +33,7 @@ impl Authenticator for Server {
req: &HttpRequest,
session: &HttpSessionData,
allow_api_access: bool,
) -> trc::Result<(InFlight, Arc<AccessToken>)> {
) -> trc::Result<(Option<InFlight>, Arc<AccessToken>)> {
if let Some((mechanism, token)) = req.authorization() {
// Check if the credentials are cached
if let Some(http_cache) = self.inner.cache.http_auth.get(token) {

View file

@ -4,12 +4,12 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{net::IpAddr, sync::Arc};
use std::net::IpAddr;
use common::{
ip_to_bytes,
listener::limiter::{ConcurrencyLimiter, InFlight},
ConcurrencyLimiters, Server, KV_RATE_LIMIT_HTTP_ANONYMOUS, KV_RATE_LIMIT_HTTP_AUTHENTICATED,
listener::limiter::{InFlight, LimiterResult},
Server, KV_RATE_LIMIT_HTTP_ANONYMOUS, KV_RATE_LIMIT_HTTP_AUTHENTICATED,
};
use directory::Permission;
use trc::AddContext;
@ -18,47 +18,22 @@ use common::auth::AccessToken;
use std::future::Future;
pub trait RateLimiter: Sync + Send {
fn get_concurrency_limiter(&self, account_id: u32) -> Arc<ConcurrencyLimiters>;
fn is_http_authenticated_request_allowed(
&self,
access_token: &AccessToken,
) -> impl Future<Output = trc::Result<InFlight>> + Send;
) -> impl Future<Output = trc::Result<Option<InFlight>>> + Send;
fn is_http_anonymous_request_allowed(
&self,
addr: &IpAddr,
) -> impl Future<Output = trc::Result<()>> + Send;
fn is_upload_allowed(&self, access_token: &AccessToken) -> trc::Result<InFlight>;
fn is_upload_allowed(&self, access_token: &AccessToken) -> trc::Result<Option<InFlight>>;
}
impl RateLimiter for Server {
fn get_concurrency_limiter(&self, account_id: u32) -> Arc<ConcurrencyLimiters> {
self.inner
.data
.jmap_limiter
.get(&account_id)
.map(|limiter| limiter.clone())
.unwrap_or_else(|| {
let limiter = Arc::new(ConcurrencyLimiters {
concurrent_requests: ConcurrencyLimiter::new(
self.core.jmap.request_max_concurrent,
),
concurrent_uploads: ConcurrencyLimiter::new(
self.core.jmap.upload_max_concurrent,
),
});
self.inner
.data
.jmap_limiter
.insert(account_id, limiter.clone());
limiter
})
}
async fn is_http_authenticated_request_allowed(
&self,
access_token: &AccessToken,
) -> trc::Result<InFlight> {
let limiter = self.get_concurrency_limiter(access_token.primary_id());
) -> trc::Result<Option<InFlight>> {
let is_rate_allowed = if let Some(rate) = &self.core.jmap.rate_authenticated {
self.core
.storage
@ -77,15 +52,19 @@ impl RateLimiter for Server {
};
if is_rate_allowed {
if let Some(in_flight_request) = limiter.concurrent_requests.is_allowed() {
Ok(in_flight_request)
} else if access_token.has_permission(Permission::UnlimitedRequests) {
Ok(InFlight::default())
} else {
Err(trc::LimitEvent::ConcurrentRequest.into_err())
match access_token.is_http_request_allowed() {
LimiterResult::Allowed(in_flight) => Ok(Some(in_flight)),
LimiterResult::Forbidden => {
if access_token.has_permission(Permission::UnlimitedRequests) {
Ok(None)
} else {
Err(trc::LimitEvent::ConcurrentRequest.into_err())
}
}
LimiterResult::Disabled => Ok(None),
}
} else if access_token.has_permission(Permission::UnlimitedRequests) {
Ok(InFlight::default())
Ok(None)
} else {
Err(trc::LimitEvent::TooManyRequests.into_err())
}
@ -114,17 +93,17 @@ impl RateLimiter for Server {
Ok(())
}
fn is_upload_allowed(&self, access_token: &AccessToken) -> trc::Result<InFlight> {
if let Some(in_flight_request) = self
.get_concurrency_limiter(access_token.primary_id())
.concurrent_uploads
.is_allowed()
{
Ok(in_flight_request)
} else if access_token.has_permission(Permission::UnlimitedRequests) {
Ok(InFlight::default())
} else {
Err(trc::LimitEvent::ConcurrentUpload.into_err())
fn is_upload_allowed(&self, access_token: &AccessToken) -> trc::Result<Option<InFlight>> {
match access_token.is_upload_allowed() {
LimiterResult::Allowed(in_flight) => Ok(Some(in_flight)),
LimiterResult::Forbidden => {
if access_token.has_permission(Permission::UnlimitedRequests) {
Ok(None)
} else {
Err(trc::LimitEvent::ConcurrentUpload.into_err())
}
}
LimiterResult::Disabled => Ok(None),
}
}
}

View file

@ -7,7 +7,7 @@
use std::{
collections::BinaryHeap,
future::Future,
sync::{atomic::Ordering, Arc},
sync::Arc,
time::{Duration, Instant, SystemTime},
};
@ -39,7 +39,6 @@ struct Action {
#[derive(PartialEq, Eq, Debug)]
enum ActionClass {
Session,
Account,
Store(usize),
Acme(String),
@ -71,12 +70,6 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
{
let server = inner.build_server();
// Session purge
queue.schedule(
Instant::now() + server.core.jmap.session_purge_frequency.time_to_next(),
ActionClass::Session,
);
// Account purge
if server.core.network.roles.purge_accounts {
queue.schedule(
@ -324,37 +317,6 @@ pub fn spawn_housekeeper(inner: Arc<Inner>, mut rx: mpsc::Receiver<HousekeeperEv
server.purge(PurgeType::Account(None), 0).await;
});
}
ActionClass::Session => {
trc::event!(
Housekeeper(trc::HousekeeperEvent::Run),
Type = "purge_session"
);
let server = server.clone();
queue.schedule(
Instant::now()
+ server.core.jmap.session_purge_frequency.time_to_next(),
ActionClass::Session,
);
tokio::spawn(async move {
trc::event!(Purge(PurgeEvent::Started), Type = "session");
server
.inner
.data
.jmap_limiter
.retain(|_, limiter| limiter.is_active());
for throttle in [
&server.inner.data.smtp_session_throttle,
&server.inner.data.smtp_queue_throttle,
] {
throttle.retain(|_, v| {
v.concurrent.load(Ordering::Relaxed) > 0
});
}
});
}
ActionClass::Store(idx) => {
if let Some(schedule) =
server.core.storage.purge_schedules.get(idx).cloned()

View file

@ -9,8 +9,7 @@ use common::{
sasl::{sasl_decode_challenge_oauth, sasl_decode_challenge_plain},
AuthRequest,
},
listener::{limiter::ConcurrencyLimiter, SessionStream},
ConcurrencyLimiters,
listener::{limiter::LimiterResult, SessionStream},
};
use directory::Permission;
use imap_proto::{
@ -18,7 +17,6 @@ use imap_proto::{
receiver::{self, Request},
};
use mail_parser::decoders::base64::base64_decode;
use std::sync::Arc;
use crate::core::{Command, Session, State, StatusResponse};
@ -104,15 +102,12 @@ impl<T: SessionStream> Session<T> {
})?;
// Enforce concurrency limits
let in_flight = match self
.get_concurrency_limiter(access_token.primary_id())
.map(|limiter| limiter.concurrent_requests.is_allowed())
{
Some(Some(limiter)) => Some(limiter),
None => None,
Some(None) => {
let in_flight = match access_token.is_imap_request_allowed() {
LimiterResult::Allowed(in_flight) => Some(in_flight),
LimiterResult::Forbidden => {
return Err(trc::LimitEvent::ConcurrentRequest.into_err());
}
LimiterResult::Disabled => None,
};
// Create session
@ -135,27 +130,4 @@ impl<T: SessionStream> Session<T> {
Ok(StatusResponse::ok("Unauthenticate successful.").into_bytes())
}
pub fn get_concurrency_limiter(&self, account_id: u32) -> Option<Arc<ConcurrencyLimiters>> {
let rate = self.server.core.imap.rate_concurrent?;
self.server
.inner
.data
.imap_limiter
.get(&account_id)
.map(|limiter| limiter.clone())
.unwrap_or_else(|| {
let limiter = Arc::new(ConcurrencyLimiters {
concurrent_requests: ConcurrencyLimiter::new(rate),
concurrent_uploads: ConcurrencyLimiter::new(rate),
});
self.server
.inner
.data
.imap_limiter
.insert(account_id, limiter.clone());
limiter
})
.into()
}
}

View file

@ -9,13 +9,11 @@ use common::{
sasl::{sasl_decode_challenge_oauth, sasl_decode_challenge_plain},
AuthRequest,
},
listener::{limiter::ConcurrencyLimiter, SessionStream},
ConcurrencyLimiters,
listener::{limiter::LimiterResult, SessionStream},
};
use directory::Permission;
use mail_parser::decoders::base64::base64_decode;
use mail_send::Credentials;
use std::sync::Arc;
use crate::{
protocol::{request, Command, Mechanism},
@ -103,15 +101,12 @@ impl<T: SessionStream> Session<T> {
})?;
// Enforce concurrency limits
let in_flight = match self
.get_concurrency_limiter(access_token.primary_id())
.map(|limiter| limiter.concurrent_requests.is_allowed())
{
Some(Some(limiter)) => Some(limiter),
None => None,
Some(None) => {
let in_flight = match access_token.is_imap_request_allowed() {
LimiterResult::Allowed(in_flight) => Some(in_flight),
LimiterResult::Forbidden => {
return Err(trc::LimitEvent::ConcurrentRequest.into_err());
}
LimiterResult::Disabled => None,
};
// Fetch mailbox
@ -125,27 +120,4 @@ impl<T: SessionStream> Session<T> {
};
self.write_ok("Authentication successful").await
}
pub fn get_concurrency_limiter(&self, account_id: u32) -> Option<Arc<ConcurrencyLimiters>> {
let rate = self.server.core.imap.rate_concurrent?;
self.server
.inner
.data
.imap_limiter
.get(&account_id)
.map(|limiter| limiter.clone())
.unwrap_or_else(|| {
let limiter = Arc::new(ConcurrencyLimiters {
concurrent_requests: ConcurrencyLimiter::new(rate),
concurrent_uploads: ConcurrencyLimiter::new(rate),
});
self.server
.inner
.data
.imap_limiter
.insert(account_id, limiter.clone());
limiter
})
.into()
}
}

View file

@ -43,7 +43,6 @@ md5 = "0.7.0"
rayon = "1.5"
parking_lot = "0.12"
regex = "1.7.0"
dashmap = "6.0"
blake3 = "1.3"
lru-cache = "0.1.2"
rand = "0.8.5"

View file

@ -14,7 +14,7 @@ use std::{
use common::{
auth::AccessToken,
config::smtp::auth::VerifyStrategy,
listener::{asn::AsnGeoLookupResult, limiter::InFlight, ServerInstance},
listener::{asn::AsnGeoLookupResult, ServerInstance},
Inner, Server,
};
use directory::Directory;
@ -62,7 +62,6 @@ pub struct Session<T: AsyncWrite + AsyncRead> {
pub stream: T,
pub data: SessionData,
pub params: SessionParameters,
pub in_flight: Vec<InFlight>,
}
pub struct SessionData {
@ -247,7 +246,6 @@ impl Session<common::listener::stream::NullIo> {
can_expn: false,
can_vrfy: false,
},
in_flight: vec![],
}
}

View file

@ -5,12 +5,12 @@
*/
use common::{
config::smtp::{queue::QueueQuota, *},
config::smtp::*,
expr::{functions::ResolveVariable, *},
listener::{limiter::ConcurrencyLimiter, SessionStream},
listener::SessionStream,
ThrottleKey, KV_RATE_LIMIT_HASH,
};
use dashmap::mapref::entry::Entry;
use queue::QueueQuota;
use trc::SmtpEvent;
use utils::config::Rate;
@ -71,7 +71,7 @@ impl NewKey for QueueQuota {
}
}
impl NewKey for Throttle {
impl NewKey for QueueRateLimiter {
fn new_key(&self, e: &impl ResolveVariable) -> ThrottleKey {
let mut hasher = blake3::Hasher::new();
@ -129,13 +129,8 @@ impl NewKey for Throttle {
if (self.keys & THROTTLE_LOCAL_IP) != 0 {
hasher.update(e.resolve_variable(V_LOCAL_IP).to_string().as_bytes());
}
if let Some(rate_limit) = &self.rate {
hasher.update(&rate_limit.period.as_secs().to_ne_bytes()[..]);
hasher.update(&rate_limit.requests.to_ne_bytes()[..]);
}
if let Some(concurrency) = &self.concurrency {
hasher.update(&concurrency.to_ne_bytes()[..]);
}
hasher.update(&self.rate.period.as_secs().to_ne_bytes()[..]);
hasher.update(&self.rate.requests.to_ne_bytes()[..]);
ThrottleKey {
hash: hasher.finalize().into(),
@ -146,11 +141,11 @@ impl NewKey for Throttle {
impl<T: SessionStream> Session<T> {
pub async fn is_allowed(&mut self) -> bool {
let throttles = if !self.data.rcpt_to.is_empty() {
&self.server.core.smtp.session.throttle.rcpt_to
&self.server.core.smtp.queue.inbound_limiters.rcpt
} else if self.data.mail_from.is_some() {
&self.server.core.smtp.session.throttle.mail_from
&self.server.core.smtp.queue.inbound_limiters.sender
} else {
&self.server.core.smtp.session.throttle.connect
&self.server.core.smtp.queue.inbound_limiters.remote
};
for t in throttles {
@ -177,69 +172,34 @@ impl<T: SessionStream> Session<T> {
// Build throttle key
let key = t.new_key(self);
// Check concurrency
if let Some(concurrency) = &t.concurrency {
match self
.server
.inner
.data
.smtp_session_throttle
.entry(key.clone())
{
Entry::Occupied(mut e) => {
let limiter = e.get_mut();
if let Some(inflight) = limiter.is_allowed() {
self.in_flight.push(inflight);
} else {
trc::event!(
Smtp(SmtpEvent::ConcurrencyLimitExceeded),
SpanId = self.data.session_id,
Id = t.id.clone(),
Limit = limiter.max_concurrent
);
return false;
}
}
Entry::Vacant(e) => {
let limiter = ConcurrencyLimiter::new(*concurrency);
if let Some(inflight) = limiter.is_allowed() {
self.in_flight.push(inflight);
}
e.insert(limiter);
}
}
}
// Check rate
if let Some(rate) = &t.rate {
match self
.server
.core
.storage
.lookup
.is_rate_allowed(KV_RATE_LIMIT_HASH, key.hash.as_slice(), rate, false)
.await
{
Ok(Some(_)) => {
trc::event!(
Smtp(SmtpEvent::RateLimitExceeded),
SpanId = self.data.session_id,
Id = t.id.clone(),
Limit = vec![
trc::Value::from(rate.requests),
trc::Value::from(rate.period)
],
);
match self
.server
.core
.storage
.lookup
.is_rate_allowed(KV_RATE_LIMIT_HASH, key.hash.as_slice(), &t.rate, false)
.await
{
Ok(Some(_)) => {
trc::event!(
Smtp(SmtpEvent::RateLimitExceeded),
SpanId = self.data.session_id,
Id = t.id.clone(),
Limit = vec![
trc::Value::from(t.rate.requests),
trc::Value::from(t.rate.period)
],
);
return false;
}
Err(err) => {
trc::error!(err
.span_id(self.data.session_id)
.caused_by(trc::location!()));
}
_ => (),
return false;
}
Err(err) => {
trc::error!(err
.span_id(self.data.session_id)
.caused_by(trc::location!()));
}
_ => (),
}
}
}

View file

@ -23,6 +23,7 @@ impl SessionManager for SmtpSessionManager {
async fn handle<T: SessionStream>(self, session: listener::SessionData<T>) {
// Build server and create session
let server = self.inner.build_server();
let _in_flight = session.in_flight;
let mut session = Session {
data: SessionData::new(
session.local_ip,
@ -37,7 +38,6 @@ impl SessionManager for SmtpSessionManager {
instance: session.instance,
state: State::default(),
stream: session.stream,
in_flight: vec![session.in_flight],
params: SessionParameters::default(),
};
@ -266,7 +266,6 @@ impl<T: SessionStream> Session<T> {
data: self.data,
instance: self.instance,
server: self.server,
in_flight: self.in_flight,
params: self.params,
})
}

View file

@ -39,13 +39,13 @@ use crate::{
};
use super::{lookup::ToNextHop, mta_sts, session::SessionParams, NextHop, TlsStrategy};
use crate::queue::{throttle, DeliveryAttempt, Domain, Error, QueueEnvelope, Status};
use crate::queue::{Domain, Error, QueueEnvelope, QueuedMessage, Status};
impl DeliveryAttempt {
impl QueuedMessage {
pub fn try_deliver(self, server: Server) {
tokio::spawn(async move {
// Lock queue event
let queue_id = self.event.queue_id;
let queue_id = self.queue_id;
let status = if server.try_lock_event(queue_id).await {
if let Some(mut message) = server.read_message(queue_id).await {
// Generate span id
@ -98,8 +98,8 @@ impl DeliveryAttempt {
let mut batch = BatchBuilder::new();
batch.clear(ValueClass::Queue(QueueClass::MessageEvent(
store::write::QueueEvent {
due: self.event.due,
queue_id: self.event.queue_id,
due: self.due,
queue_id: self.queue_id,
},
)));
@ -138,7 +138,7 @@ impl DeliveryAttempt {
});
}
async fn deliver_task(mut self, server: Server, mut message: Message) -> QueueEventStatus {
async fn deliver_task(self, server: Server, mut message: Message) -> QueueEventStatus {
// Check that the message still has recipients to be delivered
let has_pending_delivery = message.has_pending_delivery();
let span_id = message.span_id;
@ -152,7 +152,7 @@ impl DeliveryAttempt {
if due > now() {
// Save changes
message
.save_changes(&server, self.event.due.into(), due.into())
.save_changes(&server, self.due.into(), due.into())
.await;
return QueueEventStatus::Deferred;
}
@ -164,62 +164,36 @@ impl DeliveryAttempt {
);
// All message recipients expired, do not re-queue. (DSN has been already sent)
message.remove(&server, self.event.due).await;
message.remove(&server, self.due).await;
return QueueEventStatus::Completed;
}
// Throttle sender
for throttle in &server.core.smtp.queue.throttle.sender {
if let Err(err) = server
.is_allowed(throttle, &message, &mut self.in_flight, message.span_id)
.await
{
let event = match err {
throttle::Error::Concurrency { limiter } => {
// Save changes to disk
let next_due = message.next_event_after(now());
message.save_changes(&server, None, None).await;
for throttle in &server.core.smtp.queue.outbound_limiters.sender {
if let Err(retry_at) = server.is_allowed(throttle, &message, message.span_id).await {
// Save changes to disk
let next_event = std::cmp::min(
retry_at,
message.next_event_after(now()).unwrap_or(u64::MAX),
);
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
Id = throttle.id.clone(),
SpanId = span_id,
);
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
SpanId = span_id,
NextRetry = trc::Value::Timestamp(next_event)
);
QueueEventStatus::Limited {
limiters: vec![limiter],
next_due,
}
}
throttle::Error::Rate { retry_at } => {
// Save changes to disk
let next_event = std::cmp::min(
retry_at,
message.next_event_after(now()).unwrap_or(u64::MAX),
);
message
.save_changes(&server, self.due.into(), next_event.into())
.await;
trc::event!(
Delivery(DeliveryEvent::RateLimitExceeded),
Id = throttle.id.clone(),
SpanId = span_id,
NextRetry = trc::Value::Timestamp(next_event)
);
message
.save_changes(&server, self.event.due.into(), next_event.into())
.await;
QueueEventStatus::Deferred
}
};
return event;
return QueueEventStatus::Deferred;
}
}
let queue_config = &server.core.smtp.queue;
let mut on_hold = Vec::new();
let no_ip = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));
let mut recipients = std::mem::take(&mut message.recipients);
'next_domain: for domain_idx in 0..message.domains.len() {
@ -242,10 +216,9 @@ impl DeliveryAttempt {
let mut envelope = QueueEnvelope::new(&message, domain_idx);
// Throttle recipient domain
let mut in_flight = Vec::new();
for throttle in &queue_config.throttle.rcpt {
if let Err(err) = server
.is_allowed(throttle, &envelope, &mut in_flight, message.span_id)
for throttle in &queue_config.outbound_limiters.rcpt {
if let Err(retry_at) = server
.is_allowed(throttle, &envelope, message.span_id)
.await
{
trc::event!(
@ -255,7 +228,7 @@ impl DeliveryAttempt {
Domain = domain.domain.clone(),
);
message.domains[domain_idx].set_throttle_error(err, &mut on_hold);
message.domains[domain_idx].set_rate_limiter_error(retry_at);
continue 'next_domain;
}
}
@ -868,11 +841,10 @@ impl DeliveryAttempt {
envelope.local_ip = source_ip.unwrap_or(no_ip);
// Throttle remote host
let mut in_flight_host = Vec::new();
envelope.remote_ip = remote_ip;
for throttle in &queue_config.throttle.host {
if let Err(err) = server
.is_allowed(throttle, &envelope, &mut in_flight_host, message.span_id)
for throttle in &queue_config.outbound_limiters.remote {
if let Err(retry_at) = server
.is_allowed(throttle, &envelope, message.span_id)
.await
{
trc::event!(
@ -881,7 +853,7 @@ impl DeliveryAttempt {
Id = throttle.id.clone(),
RemoteIp = remote_ip,
);
message.domains[domain_idx].set_throttle_error(err, &mut on_hold);
message.domains[domain_idx].set_rate_limiter_error(retry_at);
continue 'next_domain;
}
}
@ -1322,21 +1294,7 @@ impl DeliveryAttempt {
server.send_dsn(&mut message).await;
// Notify queue manager
if !on_hold.is_empty() {
// Save changes to disk
let next_due = message.next_event_after(now());
message.save_changes(&server, None, None).await;
trc::event!(
Delivery(DeliveryEvent::ConcurrencyLimitExceeded),
SpanId = span_id,
);
QueueEventStatus::Limited {
limiters: on_hold,
next_due,
}
} else if let Some(due) = message.next_event() {
if let Some(due) = message.next_event() {
trc::event!(
Queue(trc::QueueEvent::Rescheduled),
SpanId = span_id,
@ -1347,7 +1305,7 @@ impl DeliveryAttempt {
// Save changes to disk
message
.save_changes(&server, self.event.due.into(), due.into())
.save_changes(&server, self.due.into(), due.into())
.await;
QueueEventStatus::Deferred
@ -1359,7 +1317,7 @@ impl DeliveryAttempt {
);
// Delete message from queue
message.remove(&server, self.event.due).await;
message.remove(&server, self.due).await;
QueueEventStatus::Completed
}

View file

@ -6,17 +6,14 @@
use std::borrow::Cow;
use common::{
config::{
server::ServerProtocol,
smtp::queue::{RelayHost, RequireOptional},
},
ipc::QueuedMessage,
use common::config::{
server::ServerProtocol,
smtp::queue::{RelayHost, RequireOptional},
};
use mail_send::Credentials;
use smtp_proto::{Response, Severity};
use crate::queue::{DeliveryAttempt, Error, ErrorDetails, HostResponse, Status};
use crate::queue::{Error, ErrorDetails, HostResponse, Status};
pub mod client;
pub mod dane;
@ -203,15 +200,6 @@ impl From<mta_sts::Error> for Status<(), Error> {
}
}
impl DeliveryAttempt {
pub fn new(event: QueuedMessage) -> Self {
DeliveryAttempt {
in_flight: Vec::new(),
event,
}
}
}
#[derive(Debug)]
pub enum NextHop<'x> {
Relay(&'x RelayHost),

View file

@ -22,7 +22,7 @@ use tokio::sync::mpsc;
use super::{
spool::{SmtpSpool, QUEUE_REFRESH},
DeliveryAttempt, Message, QueueId, Status,
Message, QueueId, Status,
};
pub struct Queue {
@ -96,13 +96,6 @@ impl Queue {
self.on_hold.insert(queue_id, OnHold::Locked { until });
self.on_hold.len() > 1 || has_back_pressure
}
QueueEventStatus::Limited { limiters, next_due } => {
self.on_hold.insert(
queue_id,
OnHold::ConcurrencyLimited { limiters, next_due },
);
!self.on_hold.is_empty() || has_back_pressure
}
QueueEventStatus::Deferred => {
self.on_hold.remove(&queue_id);
true
@ -129,7 +122,7 @@ impl Queue {
if refresh_queue || self.next_wake_up <= Instant::now() {
// If the number of in-flight messages is greater than the maximum allowed, skip the queue
let server = self.core.build_server();
let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency;
let max_in_flight = server.core.smtp.queue.max_threads;
has_back_pressure = in_flight_count >= max_in_flight;
if has_back_pressure {
self.next_wake_up = Instant::now() + Duration::from_secs(QUEUE_REFRESH);
@ -233,7 +226,7 @@ impl Queue {
// Deliver message
in_flight_count += 1;
self.on_hold.insert(queue_event.queue_id, OnHold::InFlight);
DeliveryAttempt::new(*queue_event).try_deliver(server.clone());
queue_event.try_deliver(server.clone());
} else {
let due_in = queue_event.due - now;
if due_in < next_wake_up {

View file

@ -10,11 +10,7 @@ use std::{
time::{Duration, Instant, SystemTime},
};
use common::{
expr::{self, functions::ResolveVariable, *},
ipc::QueuedMessage,
listener::limiter::InFlight,
};
use common::expr::{self, functions::ResolveVariable, *};
use serde::{Deserialize, Serialize};
use smtp_proto::Response;
use store::write::now;
@ -34,6 +30,12 @@ pub struct Schedule<T> {
pub inner: T,
}
#[derive(Debug, Clone, Copy)]
pub struct QueuedMessage {
pub due: u64,
pub queue_id: u64,
}
#[derive(Debug, Clone, Copy)]
pub enum MessageSource {
Authenticated,
@ -131,11 +133,6 @@ pub struct ErrorDetails {
pub details: String,
}
pub struct DeliveryAttempt {
pub in_flight: Vec<InFlight>,
pub event: QueuedMessage,
}
impl<T> Ord for Schedule<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.due.cmp(&self.due)

View file

@ -5,7 +5,7 @@
*/
use crate::queue::DomainPart;
use common::ipc::{QueueEvent, QueuedMessage};
use common::ipc::QueueEvent;
use common::{Server, KV_LOCK_QUEUE_MESSAGE};
use std::borrow::Cow;
use std::future::Future;
@ -17,7 +17,8 @@ use trc::ServerEvent;
use utils::BlobHash;
use super::{
Domain, Message, MessageSource, QueueEnvelope, QueueId, QuotaKey, Recipient, Schedule, Status,
Domain, Message, MessageSource, QueueEnvelope, QueueId, QueuedMessage, QuotaKey, Recipient,
Schedule, Status,
};
pub const LOCK_EXPIRY: u64 = 300;
@ -387,13 +388,11 @@ impl Message {
) -> bool {
debug_assert!(prev_event.is_some() == next_event.is_some());
let mut batch = BatchBuilder::new();
// Release quota for completed deliveries
let mut batch = BatchBuilder::new();
self.release_quota(&mut batch);
// Update message queue
let mut batch = BatchBuilder::new();
if let (Some(prev_event), Some(next_event)) = (prev_event, next_event) {
batch
.clear(ValueClass::Queue(QueueClass::MessageEvent(

View file

@ -7,42 +7,30 @@
use std::future::Future;
use common::{
config::smtp::Throttle,
expr::functions::ResolveVariable,
listener::limiter::{ConcurrencyLimiter, InFlight},
Server, KV_RATE_LIMIT_HASH,
config::smtp::QueueRateLimiter, expr::functions::ResolveVariable, Server, KV_RATE_LIMIT_HASH,
};
use dashmap::mapref::entry::Entry;
use store::write::now;
use crate::core::throttle::NewKey;
use super::{Domain, Status};
#[derive(Debug)]
pub enum Error {
Concurrency { limiter: ConcurrencyLimiter },
Rate { retry_at: u64 },
}
pub trait IsAllowed: Sync + Send {
fn is_allowed<'x>(
&'x self,
throttle: &'x Throttle,
throttle: &'x QueueRateLimiter,
envelope: &impl ResolveVariable,
in_flight: &mut Vec<InFlight>,
session_id: u64,
) -> impl Future<Output = Result<(), Error>> + Send;
) -> impl Future<Output = Result<(), u64>> + Send;
}
impl IsAllowed for Server {
async fn is_allowed<'x>(
&'x self,
throttle: &'x Throttle,
throttle: &'x QueueRateLimiter,
envelope: &impl ResolveVariable,
in_flight: &mut Vec<InFlight>,
session_id: u64,
) -> Result<(), Error> {
) -> Result<(), u64> {
if throttle.expr.is_empty()
|| self
.eval_expr(&throttle.expr, envelope, "throttle", session_id)
@ -51,63 +39,30 @@ impl IsAllowed for Server {
{
let key = throttle.new_key(envelope);
if let Some(rate) = &throttle.rate {
match self
.core
.storage
.lookup
.is_rate_allowed(KV_RATE_LIMIT_HASH, key.as_ref(), rate, false)
.await
{
Ok(Some(next_refill)) => {
trc::event!(
Queue(trc::QueueEvent::RateLimitExceeded),
SpanId = session_id,
Id = throttle.id.clone(),
Limit = vec![
trc::Value::from(rate.requests),
trc::Value::from(rate.period)
],
);
match self
.core
.storage
.lookup
.is_rate_allowed(KV_RATE_LIMIT_HASH, key.as_ref(), &throttle.rate, false)
.await
{
Ok(Some(next_refill)) => {
trc::event!(
Queue(trc::QueueEvent::RateLimitExceeded),
SpanId = session_id,
Id = throttle.id.clone(),
Limit = vec![
trc::Value::from(throttle.rate.requests),
trc::Value::from(throttle.rate.period)
],
);
return Err(Error::Rate {
retry_at: now() + next_refill,
});
}
Err(err) => {
trc::error!(err.span_id(session_id).caused_by(trc::location!()));
}
_ => (),
return Err(now() + next_refill);
}
}
if let Some(concurrency) = &throttle.concurrency {
match self.inner.data.smtp_queue_throttle.entry(key) {
Entry::Occupied(mut e) => {
let limiter = e.get_mut();
if let Some(inflight) = limiter.is_allowed() {
in_flight.push(inflight);
} else {
trc::event!(
Queue(trc::QueueEvent::ConcurrencyLimitExceeded),
SpanId = session_id,
Id = throttle.id.clone(),
Limit = limiter.max_concurrent,
);
return Err(Error::Concurrency {
limiter: limiter.clone(),
});
}
}
Entry::Vacant(e) => {
let limiter = ConcurrencyLimiter::new(*concurrency);
if let Some(inflight) = limiter.is_allowed() {
in_flight.push(inflight);
}
e.insert(limiter);
}
Err(err) => {
trc::error!(err.span_id(session_id).caused_by(trc::location!()));
}
_ => (),
}
}
@ -116,16 +71,8 @@ impl IsAllowed for Server {
}
impl Domain {
pub fn set_throttle_error(&mut self, err: Error, on_hold: &mut Vec<ConcurrencyLimiter>) {
match err {
Error::Concurrency { limiter } => {
on_hold.push(limiter);
self.status = Status::TemporaryFailure(super::Error::ConcurrencyLimited);
}
Error::Rate { retry_at } => {
self.retry.due = retry_at;
self.status = Status::TemporaryFailure(super::Error::RateLimited);
}
}
pub fn set_rate_limiter_error(&mut self, retry_at: u64) {
self.retry.due = retry_at;
self.status = Status::TemporaryFailure(super::Error::RateLimited);
}
}

File diff suppressed because it is too large Load diff

View file

@ -58,7 +58,6 @@ hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
hyper-util = { version = "0.1.1", features = ["tokio"] }
http-body-util = "0.1.0"
base64 = "0.22"
dashmap = "6.0"
ahash = { version = "0.8" }
serial_test = "3.0.0"
num_cpus = "1.15.0"

View file

@ -22,9 +22,6 @@ oauth.key = "0Wn7rO4UdmBoE8mp3cDcD9Qlpz3na74z7fGRoSuq8fVsGPelLl3KrHomBN8h2biA"
queue.quota.size.enable = true
queue.quota.size.messages = 100000
queue.quota.size.size = 10737418240
queue.throttle.rcpt.concurrency = 5
queue.throttle.rcpt.enable = true
queue.throttle.rcpt.key = "rcpt_domain"
report.analysis.addresses = "postmaster@*"
server.http.permissive-cors = true
server.listener.http.bind = "[::]:5002"
@ -46,13 +43,6 @@ server.listener.submission.protocol = "smtp"
server.listener.submissions.bind = "[::]:465"
server.listener.submissions.protocol = "smtp"
server.listener.submissions.tls.implicit = true
session.throttle.ip.concurrency = 5
session.throttle.ip.enable = true
session.throttle.ip.key = "remote_ip"
session.throttle.sender.enable = true
session.throttle.sender.key.0 = "sender_domain"
session.throttle.sender.key.1 = "rcpt"
session.throttle.sender.rate = "25/1h"
storage.blob = "rocksdb"
storage.data = "rocksdb"
storage.directory = "internal"

View file

@ -7,7 +7,7 @@ test
<!-- NEXT TEST -->
remote_ip 20.11.0.2
expect RBL_SENDERSCORE_REPUT_0 RBL_NIXSPAM RBL_SEM RBL_SPAMHAUS_SBL RBL_BARRACUDA RBL_BLOCKLISTDE RBL_VIRUSFREE_BOTNET RBL_SPAMCOP RCVD_IN_DNSWL_MED
expect RBL_SENDERSCORE_REPUT_0 RBL_SEM RBL_SPAMHAUS_SBL RBL_BARRACUDA RBL_BLOCKLISTDE RBL_VIRUSFREE_BOTNET RBL_SPAMCOP RCVD_IN_DNSWL_MED
Subject: test

View file

@ -1,12 +1,11 @@
[[throttle]]
match = "remote_ip == '127.0.0.1'"
key = ["remote_ip", "authenticated_as"]
concurrency = 100
rate = "50/30s"
enable = true
[[throttle]]
key = "sender_domain"
concurrency = 10000
rate = "50/30s"
enable = true

View file

@ -122,7 +122,7 @@ pub fn spawn_mock_imap_server(max_concurrency: u64) -> watch::Sender<bool> {
//println!("--- Accepted connection --- ");
let acceptor = acceptor.clone();
let in_flight = limited.is_allowed();
tokio::spawn(accept_imap(stream, acceptor, in_flight));
tokio::spawn(accept_imap(stream, acceptor, in_flight.into()));
}
Err(err) => {
panic!("Something went wrong: {err}" );

View file

@ -228,7 +228,7 @@ pub fn spawn_mock_lmtp_server(max_concurrency: u64) -> watch::Sender<bool> {
Ok((stream, _)) => {
let acceptor = acceptor.clone();
let in_flight = limited.is_allowed();
tokio::spawn(accept_smtp(stream, rx.clone(), acceptor, in_flight));
tokio::spawn(accept_smtp(stream, rx.clone(), acceptor, in_flight.into()));
}
Err(err) => {
panic!("Something went wrong: {err}" );

View file

@ -49,7 +49,6 @@ pub async fn test(params: &mut JMAPTest) {
.to_string();
// Reset rate limiters
server.inner.data.jmap_limiter.clear();
params.webhook.clear();
// Incorrect passwords should be rejected with a 401 error

View file

@ -381,7 +381,7 @@ pub async fn jmap_tests() {
email_query_changes::test(&mut params).await;
email_copy::test(&mut params).await;
thread_get::test(&mut params).await;
thread_merge::test(&mut params).await;
thread_merge::test(&mut params).await;*/
mailbox::test(&mut params).await;
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;
@ -392,7 +392,7 @@ pub async fn jmap_tests() {
sieve_script::test(&mut params).await;
vacation_response::test(&mut params).await;
email_submission::test(&mut params).await;
websocket::test(&mut params).await;*/
websocket::test(&mut params).await;
quota::test(&mut params).await;
crypto::test(&mut params).await;
blob::test(&mut params).await;

View file

@ -9,11 +9,12 @@ use std::{fs, net::IpAddr, path::PathBuf, sync::Arc, time::Duration};
use common::{
config::{
server::{Listener, Listeners, ServerProtocol, TcpListener},
smtp::{throttle::parse_throttle, *},
smtp::*,
},
expr::{functions::ResolveVariable, if_block::*, tokenizer::TokenMap, *},
Server,
};
use throttle::parse_queue_rate_limiter;
use tokio::net::TcpSocket;
use utils::config::{Config, Rate};
@ -241,7 +242,7 @@ fn parse_throttles() {
file.push("throttle.toml");
let mut config = Config::new(fs::read_to_string(file).unwrap()).unwrap();
let throttle = parse_throttle(
let throttle = parse_queue_rate_limiter(
&mut config,
"throttle",
&TokenMap::default().with_variables(&[
@ -261,7 +262,7 @@ fn parse_throttles() {
assert_eq!(
throttle,
vec![
Throttle {
QueueRateLimiter {
id: "0000".to_string(),
expr: Expression {
items: vec![
@ -271,19 +272,19 @@ fn parse_throttles() {
]
},
keys: THROTTLE_REMOTE_IP | THROTTLE_AUTH_AS,
concurrency: 100.into(),
rate: Rate {
requests: 50,
period: Duration::from_secs(30)
}
.into()
},
Throttle {
QueueRateLimiter {
id: "0001".to_string(),
expr: Expression::default(),
keys: THROTTLE_SENDER_DOMAIN,
concurrency: 10000.into(),
rate: None
rate: Rate {
requests: 50,
period: Duration::from_secs(30)
}
}
]
);

View file

@ -59,7 +59,7 @@ is-allowed = "sender_domain != 'blocked.com'"
size = [{if = "remote_ip = '10.0.0.2'", then = 2048},
{else = 1024}]
[[session.throttle]]
[[queue.limiter.inbound]]
match = "remote_ip = '10.0.0.1'"
key = 'sender'
rate = '2/1s'

View file

@ -7,7 +7,7 @@
use std::time::Duration;
use common::{
ipc::{DmarcEvent, QueueEvent, QueueEventStatus, QueuedMessage, ReportingEvent, TlsEvent},
ipc::{DmarcEvent, QueueEvent, QueueEventStatus, ReportingEvent, TlsEvent},
Server,
};
use store::{
@ -16,7 +16,7 @@ use store::{
};
use tokio::sync::mpsc::error::TryRecvError;
use smtp::queue::{DeliveryAttempt, Message, QueueId};
use smtp::queue::{Message, QueueId, QueuedMessage};
use super::{QueueReceiver, ReportReceiver};
@ -131,17 +131,17 @@ impl QueueReceiver {
message
}
pub async fn expect_message_then_deliver(&mut self) -> DeliveryAttempt {
pub async fn expect_message_then_deliver(&mut self) -> QueuedMessage {
let message = self.expect_message().await;
self.delivery_attempt(message.queue_id).await
}
pub async fn delivery_attempt(&mut self, queue_id: u64) -> DeliveryAttempt {
DeliveryAttempt::new(QueuedMessage {
pub async fn delivery_attempt(&mut self, queue_id: u64) -> QueuedMessage {
QueuedMessage {
due: self.message_due(queue_id).await,
queue_id,
})
}
}
pub async fn read_queued_events(&self) -> Vec<store::write::QueueEvent> {
@ -302,7 +302,6 @@ pub trait TestQueueEvent {
fn assert_refresh(self);
fn assert_done(self);
fn assert_refresh_or_done(self);
fn assert_on_hold(self);
}
impl TestQueueEvent for QueueEvent {
@ -336,16 +335,6 @@ impl TestQueueEvent for QueueEvent {
e => panic!("Unexpected event: {e:?}"),
}
}
fn assert_on_hold(self) {
match self {
QueueEvent::WorkerDone {
status: QueueEventStatus::Limited { .. },
..
} => (),
e => panic!("Unexpected event: {e:?}"),
}
}
}
pub trait TestReportingEvent {

View file

@ -74,7 +74,7 @@ wait = [{if = "remote_ip = '10.0.0.1'", then = '5ms'},
dsn = [{if = "remote_ip = '10.0.0.1'", then = false},
{else = true}]
[[session.throttle]]
[[queue.limiter.inbound]]
match = "remote_ip = '10.0.0.1' && !is_empty(rcpt)"
key = 'sender'
rate = '2/1s'

View file

@ -23,19 +23,18 @@ fts = "rocksdb"
type = "rocksdb"
path = "{TMP}/data.db"
[[session.throttle]]
[[queue.limiter.inbound]]
match = "remote_ip = '10.0.0.1'"
key = 'remote_ip'
concurrency = 2
rate = '3/1s'
rate = '2/1s'
enable = true
[[session.throttle]]
[[queue.limiter.inbound]]
key = 'sender'
rate = '2/1s'
enable = true
[[session.throttle]]
[[queue.limiter.inbound]]
key = ['remote_ip', 'rcpt']
rate = '2/1s'
enable = true
@ -52,24 +51,12 @@ async fn throttle_inbound() {
let stores = Stores::parse_all(&mut config, false).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
// Test connection concurrency limit
// Test connection rate limit
let mut session = Session::test(TestSMTP::from_core(core).server);
session.data.remote_ip_str = "10.0.0.1".to_string();
assert!(
session.is_allowed().await,
"Concurrency limiter too strict."
);
assert!(
session.is_allowed().await,
"Concurrency limiter too strict."
);
assert!(!session.is_allowed().await, "Concurrency limiter failed.");
// Test connection rate limit
session.in_flight.clear(); // Manually reset concurrency limiter
assert!(session.is_allowed().await, "Rate limiter too strict.");
assert!(session.is_allowed().await, "Rate limiter too strict.");
assert!(!session.is_allowed().await, "Rate limiter failed.");
session.in_flight.clear();
tokio::time::sleep(Duration::from_millis(1100)).await;
assert!(
session.is_allowed().await,

View file

@ -12,7 +12,7 @@ use crate::smtp::{
DnsCache, TestSMTP,
};
use common::{config::server::ServerProtocol, ipc::QueueEvent};
use smtp::queue::{spool::SmtpSpool, DeliveryAttempt};
use smtp::queue::spool::SmtpSpool;
use store::write::now;
const REMOTE: &str = "
@ -127,7 +127,7 @@ async fn lmtp_delivery() {
message.clone().remove(&core, event.due).await;
dsn.push(message);
} else {
DeliveryAttempt::new(event).try_deliver(core.clone());
event.try_deliver(core.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

View file

@ -15,7 +15,7 @@ use crate::smtp::{
session::{TestSession, VerifyResponse},
DnsCache, TestSMTP,
};
use smtp::queue::{spool::SmtpSpool, DeliveryAttempt};
use smtp::queue::spool::SmtpSpool;
const LOCAL: &str = r#"
[session.rcpt]
@ -159,7 +159,7 @@ async fn smtp_delivery() {
for (idx, domain) in message.domains.iter().enumerate() {
domain_retries[idx] = domain.retry.inner;
}
DeliveryAttempt::new(event).try_deliver(core.clone());
event.try_deliver(core.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

View file

@ -26,37 +26,34 @@ retry = "1h"
notify = "1h"
expire = "1h"
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "sender_domain = 'foobar.org'"
key = 'sender_domain'
concurrency = 1
enable = true
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "sender_domain = 'foobar.net'"
key = 'sender_domain'
rate = '1/30m'
enable = true
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "rcpt_domain = 'example.org'"
key = 'rcpt_domain'
concurrency = 1
enable = true
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "rcpt_domain = 'example.net'"
key = 'rcpt_domain'
rate = '1/40m'
enable = true
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "mx = 'mx.test.org'"
key = 'mx'
concurrency = 1
enable = true
[[queue.throttle]]
[[queue.limiter.outbound]]
match = "mx = 'mx.test.net'"
key = 'mx'
rate = '1/50m'
@ -88,43 +85,30 @@ async fn throttle_outbound() {
);
// Throttle sender
let mut in_flight = vec![];
let throttle = &core.core.smtp.queue.throttle;
let throttle = &core.core.smtp.queue.outbound_limiters;
for t in &throttle.sender {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
0,
)
.await
.unwrap();
core.is_allowed(t, &QueueEnvelope::test(&test_message, 0, ""), 0)
.await
.unwrap();
}
assert!(!in_flight.is_empty());
// Expect concurrency throttle for sender domain 'foobar.org'
local
/*local
.queue_receiver
.expect_message_then_deliver()
.await
.try_deliver(core.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
local.queue_receiver.read_event().await.assert_on_hold();
in_flight.clear();
local.queue_receiver.read_event().await.assert_on_hold();*/
// Expect rate limit throttle for sender domain 'foobar.net'
test_message.return_path_domain = "foobar.net".to_string();
for t in &throttle.sender {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
0,
)
.await
.unwrap();
core.is_allowed(t, &QueueEnvelope::test(&test_message, 0, ""), 0)
.await
.unwrap();
}
assert!(in_flight.is_empty());
session
.send_message("john@foobar.net", &["bill@test.org"], "test:no_dkim", "250")
.await;
@ -148,17 +132,12 @@ async fn throttle_outbound() {
status: Status::Scheduled,
});
for t in &throttle.rcpt {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 0, ""),
&mut in_flight,
0,
)
.await
.unwrap();
core.is_allowed(t, &QueueEnvelope::test(&test_message, 0, ""), 0)
.await
.unwrap();
}
assert!(!in_flight.is_empty());
session
/*session
.send_message(
"john@test.net",
&["jane@example.org"],
@ -172,8 +151,7 @@ async fn throttle_outbound() {
.await
.try_deliver(core.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
local.queue_receiver.read_event().await.assert_on_hold();
in_flight.clear();
local.queue_receiver.read_event().await.assert_on_hold();*/
// Expect rate limit throttle for recipient domain 'example.net'
test_message.domains.push(Domain {
@ -184,16 +162,11 @@ async fn throttle_outbound() {
status: Status::Scheduled,
});
for t in &throttle.rcpt {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 1, ""),
&mut in_flight,
0,
)
.await
.unwrap();
core.is_allowed(t, &QueueEnvelope::test(&test_message, 1, ""), 0)
.await
.unwrap();
}
assert!(in_flight.is_empty());
session
.send_message(
"john@test.net",
@ -233,18 +206,13 @@ async fn throttle_outbound() {
expires: 0,
status: Status::Scheduled,
});
for t in &throttle.host {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 2, "mx.test.org"),
&mut in_flight,
0,
)
.await
.unwrap();
for t in &throttle.remote {
core.is_allowed(t, &QueueEnvelope::test(&test_message, 2, "mx.test.org"), 0)
.await
.unwrap();
}
assert!(!in_flight.is_empty());
session
/*session
.send_message("john@test.net", &["jane@test.org"], "test:no_dkim", "250")
.await;
local
@ -252,8 +220,7 @@ async fn throttle_outbound() {
.expect_message_then_deliver()
.await
.try_deliver(core.clone());
local.queue_receiver.read_event().await.assert_on_hold();
in_flight.clear();
local.queue_receiver.read_event().await.assert_on_hold();*/
// Expect rate limit throttle for mx 'mx.test.net'
core.mx_add(
@ -269,17 +236,12 @@ async fn throttle_outbound() {
vec!["127.0.0.1".parse().unwrap()],
Instant::now() + Duration::from_secs(10),
);
for t in &throttle.host {
core.is_allowed(
t,
&QueueEnvelope::test(&test_message, 1, "mx.test.net"),
&mut in_flight,
0,
)
.await
.unwrap();
for t in &throttle.remote {
core.is_allowed(t, &QueueEnvelope::test(&test_message, 1, "mx.test.net"), 0)
.await
.unwrap();
}
assert!(in_flight.is_empty());
session
.send_message("john@test.net", &["jane@test.net"], "test:no_dkim", "250")
.await;

View file

@ -13,7 +13,7 @@ use crate::smtp::{
};
use ahash::AHashSet;
use common::ipc::{QueueEvent, QueueEventStatus};
use smtp::queue::{spool::SmtpSpool, DeliveryAttempt};
use smtp::queue::spool::SmtpSpool;
use store::write::now;
const CONFIG: &str = r#"
@ -85,7 +85,7 @@ async fn queue_retry() {
let attempt = qr.expect_message_then_deliver().await;
let mut dsn = Vec::new();
let mut retries = Vec::new();
in_fight.insert(attempt.event.queue_id);
in_fight.insert(attempt.queue_id);
attempt.try_deliver(core.clone());
loop {
@ -122,7 +122,7 @@ async fn queue_retry() {
} else {
retries.push(event.due.saturating_sub(now));
in_fight.insert(event.queue_id);
DeliveryAttempt::new(event).try_deliver(core.clone());
event.try_deliver(core.clone());
tokio::time::sleep(Duration::from_millis(100)).await;
}
}

View file

@ -116,7 +116,6 @@ impl TestSession for Session<DummyIo> {
0,
),
params: SessionParameters::default(),
in_flight: vec![],
hostname: "localhost".to_string(),
}
}