mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-11-08 04:46:22 +08:00
Added queue backpressure event
This commit is contained in:
parent
7142a8caab
commit
6bb468ca5d
8 changed files with 133 additions and 22 deletions
|
|
@ -26,8 +26,15 @@ impl<T: SessionStream> Session<T> {
|
|||
if self.instance.id.ends_with("-debug") {
|
||||
if to.address.contains("fail@") {
|
||||
return self.write(b"503 5.5.1 Invalid recipient.\r\n").await;
|
||||
} else if to.address.contains("delay@") {
|
||||
} else if (to.address.contains("delay-random@") && rand::random())
|
||||
|| to.address.contains("delay@")
|
||||
{
|
||||
return self.write(b"451 4.5.3 Try again later.\r\n").await;
|
||||
} else if to.address.contains("slow@") {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(
|
||||
rand::random::<u64>() % 5 + 5,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@ pub struct Queue {
|
|||
pub rx: mpsc::Receiver<QueueEvent>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum OnHold {
|
||||
InFlight,
|
||||
ConcurrencyLimited {
|
||||
|
|
@ -52,6 +53,7 @@ impl SpawnQueue for mpsc::Receiver<QueueEvent> {
|
|||
}
|
||||
|
||||
const CLEANUP_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
||||
const BACK_PRESSURE_WARN_INTERVAL: Duration = Duration::from_secs(60);
|
||||
|
||||
impl Queue {
|
||||
pub fn new(core: Arc<Inner>, rx: mpsc::Receiver<QueueEvent>) -> Self {
|
||||
|
|
@ -66,6 +68,7 @@ impl Queue {
|
|||
pub async fn start(&mut self) {
|
||||
let mut is_paused = false;
|
||||
let mut next_cleanup = Instant::now() + CLEANUP_INTERVAL;
|
||||
let mut last_backpressure_warning = Instant::now() - BACK_PRESSURE_WARN_INTERVAL;
|
||||
let mut in_flight_count = 0;
|
||||
let mut has_back_pressure = false;
|
||||
|
||||
|
|
@ -129,6 +132,25 @@ impl Queue {
|
|||
let max_in_flight = server.core.smtp.queue.throttle.outbound_concurrency;
|
||||
has_back_pressure = in_flight_count >= max_in_flight;
|
||||
if has_back_pressure {
|
||||
self.next_wake_up = Instant::now() + Duration::from_secs(QUEUE_REFRESH);
|
||||
|
||||
if last_backpressure_warning.elapsed() >= BACK_PRESSURE_WARN_INTERVAL {
|
||||
let queue_events = server.next_event().await;
|
||||
last_backpressure_warning = Instant::now();
|
||||
trc::event!(
|
||||
Queue(trc::QueueEvent::BackPressure),
|
||||
Reason =
|
||||
"Queue outbound processing capacity for this node exceeded.",
|
||||
Total = queue_events.len(),
|
||||
Details = self
|
||||
.on_hold
|
||||
.keys()
|
||||
.copied()
|
||||
.map(trc::Value::from)
|
||||
.collect::<Vec<_>>(),
|
||||
Limit = max_in_flight,
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -146,11 +168,18 @@ impl Queue {
|
|||
// Enforce global concurrency limits
|
||||
if in_flight_count >= max_in_flight {
|
||||
has_back_pressure = true;
|
||||
trc::event!(
|
||||
Queue(trc::QueueEvent::ConcurrencyLimitExceeded),
|
||||
Details = "Outbound concurrency limit exceeded.",
|
||||
Limit = max_in_flight,
|
||||
);
|
||||
if last_backpressure_warning.elapsed()
|
||||
>= BACK_PRESSURE_WARN_INTERVAL
|
||||
{
|
||||
last_backpressure_warning = Instant::now();
|
||||
trc::event!(
|
||||
Queue(trc::QueueEvent::BackPressure),
|
||||
Reason = "Queue outbound processing capacity for this node exceeded.",
|
||||
Total = queue_events.len(),
|
||||
Details = self.on_hold.keys().copied().map(trc::Value::from).collect::<Vec<_>>(),
|
||||
Limit = max_in_flight,
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -720,6 +720,7 @@ impl QueueEvent {
|
|||
QueueEvent::QueueReport => "Queued report for delivery",
|
||||
QueueEvent::QueueDsn => "Queued DSN for delivery",
|
||||
QueueEvent::QueueAutogenerated => "Queued autogenerated message for delivery",
|
||||
QueueEvent::BackPressure => "Queue backpressure detected",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -738,6 +739,9 @@ impl QueueEvent {
|
|||
QueueEvent::QueueReport => "A new report was queued for delivery",
|
||||
QueueEvent::QueueDsn => "A delivery status notification was queued for delivery",
|
||||
QueueEvent::QueueAutogenerated => "A system generated message was queued for delivery",
|
||||
QueueEvent::BackPressure => {
|
||||
"Queue congested, processing can't keep up with incoming message rate"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -459,6 +459,7 @@ impl EventType {
|
|||
DeliveryEvent::RawInput | DeliveryEvent::RawOutput => Level::Trace,
|
||||
},
|
||||
EventType::Queue(event) => match event {
|
||||
QueueEvent::BackPressure => Level::Warn,
|
||||
QueueEvent::QueueMessage
|
||||
| QueueEvent::QueueMessageAuthenticated
|
||||
| QueueEvent::QueueReport
|
||||
|
|
|
|||
|
|
@ -488,6 +488,7 @@ pub enum QueueEvent {
|
|||
RateLimitExceeded,
|
||||
ConcurrencyLimitExceeded,
|
||||
QuotaExceeded,
|
||||
BackPressure,
|
||||
}
|
||||
|
||||
#[event_type]
|
||||
|
|
|
|||
|
|
@ -864,6 +864,7 @@ impl EventType {
|
|||
EventType::Spam(SpamEvent::Dnsbl) => 562,
|
||||
EventType::Spam(SpamEvent::DnsblError) => 563,
|
||||
EventType::Spam(SpamEvent::Pyzor) => 564,
|
||||
EventType::Queue(QueueEvent::BackPressure) => 48,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1465,12 +1466,13 @@ impl EventType {
|
|||
562 => Some(EventType::Spam(SpamEvent::Dnsbl)),
|
||||
563 => Some(EventType::Spam(SpamEvent::DnsblError)),
|
||||
564 => Some(EventType::Spam(SpamEvent::Pyzor)),
|
||||
48 => Some(EventType::Queue(QueueEvent::BackPressure)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 57 48 147 148 335 336 376 458 459
|
||||
// 57 147 148 335 336 376 458 459
|
||||
|
||||
impl Key {
|
||||
fn code(&self) -> u64 {
|
||||
|
|
|
|||
|
|
@ -137,15 +137,34 @@ cert = '%{file:{CERT}}%'
|
|||
private-key = '%{file:{PK}}%'
|
||||
|
||||
[storage]
|
||||
data = "rocksdb"
|
||||
lookup = "rocksdb"
|
||||
blob = "rocksdb"
|
||||
fts = "rocksdb"
|
||||
data = "{STORE}"
|
||||
fts = "{STORE}"
|
||||
blob = "{STORE}"
|
||||
lookup = "{STORE}"
|
||||
|
||||
[store."rocksdb"]
|
||||
type = "rocksdb"
|
||||
path = "{TMP}/queue.db"
|
||||
|
||||
[store."foundationdb"]
|
||||
type = "foundationdb"
|
||||
|
||||
[store."postgresql"]
|
||||
type = "postgresql"
|
||||
host = "localhost"
|
||||
port = 5432
|
||||
database = "stalwart"
|
||||
user = "postgres"
|
||||
password = "mysecretpassword"
|
||||
|
||||
[store."mysql"]
|
||||
type = "mysql"
|
||||
host = "localhost"
|
||||
port = 3307
|
||||
database = "stalwart"
|
||||
user = "root"
|
||||
password = "password"
|
||||
|
||||
"#;
|
||||
|
||||
impl TestSMTP {
|
||||
|
|
@ -198,9 +217,21 @@ impl TestSMTP {
|
|||
}
|
||||
|
||||
pub async fn new(name: &str, config: impl AsRef<str>) -> TestSMTP {
|
||||
Self::with_database(name, config, "rocksdb").await
|
||||
}
|
||||
|
||||
pub async fn with_database(
|
||||
name: &str,
|
||||
config: impl AsRef<str>,
|
||||
store_id: impl AsRef<str>,
|
||||
) -> TestSMTP {
|
||||
let temp_dir = TempDir::new(name, true);
|
||||
let mut config =
|
||||
Config::new(temp_dir.update_config(add_test_certs(CONFIG) + config.as_ref())).unwrap();
|
||||
let mut config = Config::new(
|
||||
temp_dir
|
||||
.update_config(add_test_certs(CONFIG) + config.as_ref())
|
||||
.replace("{STORE}", store_id.as_ref()),
|
||||
)
|
||||
.unwrap();
|
||||
config.resolve_all_macros().await;
|
||||
let stores = Stores::parse_all(&mut config, false).await;
|
||||
let core = Core::parse(&mut config, stores, Default::default()).await;
|
||||
|
|
|
|||
|
|
@ -23,8 +23,12 @@ relay = true
|
|||
messages = 2000
|
||||
|
||||
[queue.outbound]
|
||||
concurrency = 1
|
||||
concurrency = 4
|
||||
|
||||
[queue.schedule]
|
||||
retry = "1s"
|
||||
notify = "1d"
|
||||
expire = "1d"
|
||||
"#;
|
||||
|
||||
const REMOTE: &str = r#"
|
||||
|
|
@ -39,7 +43,10 @@ enable = false
|
|||
|
||||
"#;
|
||||
|
||||
#[tokio::test]
|
||||
const NUM_MESSAGES: usize = 1000;
|
||||
const NUM_QUEUES: usize = 10;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 18)]
|
||||
#[serial_test::serial]
|
||||
async fn concurrent_queue() {
|
||||
// Enable logging
|
||||
|
|
@ -49,7 +56,7 @@ async fn concurrent_queue() {
|
|||
let remote = TestSMTP::new("smtp_concurrent_queue_remote", REMOTE).await;
|
||||
let _rx = remote.start(&[ServerProtocol::Smtp]).await;
|
||||
|
||||
let local = TestSMTP::new("smtp_concurrent_queue_local", LOCAL).await;
|
||||
let local = TestSMTP::with_database("smtp_concurrent_queue_local", LOCAL, "mysql").await;
|
||||
|
||||
// Add mock DNS entries
|
||||
let core = local.build_smtp();
|
||||
|
|
@ -72,9 +79,9 @@ async fn concurrent_queue() {
|
|||
session.eval_session_params().await;
|
||||
session.ehlo("mx.test.org").await;
|
||||
|
||||
// Spawn 20 concurrent queues
|
||||
// Spawn concurrent queues
|
||||
let mut inners = vec![];
|
||||
for _ in 0..20 {
|
||||
for _ in 0..NUM_QUEUES {
|
||||
let (inner, rxs) = local.inner_with_rxs();
|
||||
let server = inner.build_server();
|
||||
server.mx_add(
|
||||
|
|
@ -99,9 +106,9 @@ async fn concurrent_queue() {
|
|||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
// Send 1000 test messages
|
||||
for _ in 0..100 {
|
||||
for _ in 0..(NUM_MESSAGES / 2) {
|
||||
session
|
||||
.send_message("john@test.org", &["bill@foobar.org"], "test:no_dkim", "250")
|
||||
.send_message("john@test.org", &["slow@foobar.org"], "test:no_dkim", "250")
|
||||
.await;
|
||||
}
|
||||
|
||||
|
|
@ -109,12 +116,41 @@ async fn concurrent_queue() {
|
|||
for inner in &inners {
|
||||
inner.ipc.queue_tx.send(QueueEvent::Refresh).await.unwrap();
|
||||
}
|
||||
for _ in 0..(NUM_MESSAGES / 2) {
|
||||
session
|
||||
.send_message(
|
||||
"john@test.org",
|
||||
&["delay-random@foobar.org"],
|
||||
"test:no_dkim",
|
||||
"250",
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(1500)).await;
|
||||
loop {
|
||||
tokio::time::sleep(Duration::from_millis(1500)).await;
|
||||
|
||||
let m = local.queue_receiver.read_queued_messages().await.len();
|
||||
let e = local.queue_receiver.read_queued_events().await.len();
|
||||
|
||||
if m + e != 0 {
|
||||
println!("Queue still has {} messages and {} events", m, e);
|
||||
for inner in &inners {
|
||||
inner
|
||||
.ipc
|
||||
.queue_tx
|
||||
.send(QueueEvent::Paused(true))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
local.queue_receiver.assert_queue_is_empty().await;
|
||||
let remote_messages = remote.queue_receiver.read_queued_messages().await;
|
||||
assert_eq!(remote_messages.len(), 100);
|
||||
assert_eq!(remote_messages.len(), NUM_MESSAGES);
|
||||
|
||||
// Make sure local store is queue
|
||||
core.core
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue