ASN/Geolocation support

This commit is contained in:
mdecimus 2024-12-13 11:32:52 +01:00
parent b5696c2d26
commit 08c43e2f15
27 changed files with 919 additions and 167 deletions

49
Cargo.lock generated
View file

@ -30,7 +30,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -811,7 +811,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -820,7 +820,7 @@ version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -829,7 +829,7 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -1072,7 +1072,7 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12f8e7987cbd042a63249497f41aed09f8e65add917ea6566effbc56578d6801"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -1435,7 +1435,7 @@ version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dc92fb57ca44df6db8059111ab3af99a63d5d0f8375d9972e319a379c6bab76"
dependencies = [
"generic-array 0.14.7",
"generic-array",
"rand_core 0.6.4",
"subtle",
"zeroize",
@ -1447,7 +1447,7 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array 0.14.7",
"generic-array",
"rand_core 0.6.4",
"typenum",
]
@ -1458,7 +1458,7 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6"
dependencies = [
"generic-array 0.14.7",
"generic-array",
"subtle",
]
@ -1615,7 +1615,7 @@ version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd2735a791158376708f9347fe8faba9667589d82427ef3aed6794a8981de3d9"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -1787,7 +1787,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -2040,7 +2040,7 @@ dependencies = [
"crypto-bigint",
"digest 0.10.7",
"ff",
"generic-array 0.14.7",
"generic-array",
"group",
"hkdf",
"pem-rfc7468",
@ -2514,15 +2514,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "generic-array"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb8bc4c28d15ade99c7e90b219f30da4be5c88e586277e8cbe886beeb868ab2"
dependencies = [
"typenum",
]
[[package]]
name = "gethostname"
version = "0.4.3"
@ -3320,7 +3311,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"block-padding",
"generic-array 0.14.7",
"generic-array",
]
[[package]]
@ -3569,9 +3560,9 @@ dependencies = [
[[package]]
name = "konst"
version = "0.3.14"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b65f00fb3910881e52bf0850ae2a82aea411488a557e1c02820ceaa60963dce3"
checksum = "298ddf99f06a97c1ecd0e910932662b7842855046234b0d0376d35d93add087f"
dependencies = [
"const_panic",
"konst_kernel",
@ -3580,9 +3571,9 @@ dependencies = [
[[package]]
name = "konst_kernel"
version = "0.3.12"
version = "0.3.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "599c1232f55c72c7fc378335a3efe1c878c92720838c8e6a4fd87784ef7764de"
checksum = "e4b1eb7788f3824c629b1116a7a9060d6e898c358ebff59070093d51103dcc3c"
dependencies = [
"typewit",
]
@ -5992,7 +5983,7 @@ checksum = "d3e97a565f76233a6003f9f5c54be1d9c5bdfa3eccfb189469f11ec4901c47dc"
dependencies = [
"base16ct",
"der",
"generic-array 0.14.7",
"generic-array",
"pkcs8",
"subtle",
"zeroize",
@ -6248,7 +6239,7 @@ checksum = "1f606421e4a6012877e893c399822a4ed4b089164c5969424e1b9d1e66e6964b"
dependencies = [
"const-oid",
"digest 0.10.7",
"generic-array 1.1.1",
"generic-array",
]
[[package]]
@ -7287,9 +7278,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "typewit"
version = "1.10.1"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d51dbd25812f740f45e2a9769f84711982e000483b13b73a8a1852e092abac8c"
checksum = "cb77c29baba9e4d3a6182d51fa75e3215c7fd1dab8f4ea9d107c716878e55fc0"
dependencies = [
"typewit_proc_macros",
]

View file

@ -117,6 +117,7 @@ impl Data {
.unwrap_or_else(|| Duration::from_secs(3600)),
),
remote_lists: Default::default(),
asn_geo_data: Default::default(),
}
}
}
@ -152,6 +153,7 @@ impl Default for Data {
Duration::from_secs(3600),
Duration::from_secs(3600),
),
asn_geo_data: Default::default(),
}
}
}

View file

@ -40,7 +40,7 @@ pub mod spamfilter;
pub mod storage;
pub mod telemetry;
pub(crate) const CONNECTION_VARS: &[u32; 7] = &[
pub(crate) const CONNECTION_VARS: &[u32; 9] = &[
V_LISTENER,
V_REMOTE_IP,
V_REMOTE_PORT,
@ -48,6 +48,8 @@ pub(crate) const CONNECTION_VARS: &[u32; 7] = &[
V_LOCAL_PORT,
V_PROTOCOL,
V_TLS,
V_ASN,
V_COUNTRY,
];
impl Core {

View file

@ -4,6 +4,8 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::time::Duration;
use crate::expr::{if_block::IfBlock, tokenizer::TokenMap};
use utils::config::{Config, Rate};
@ -16,6 +18,7 @@ pub struct Network {
pub contact_form: Option<ContactForm>,
pub http_response_url: IfBlock,
pub http_allowed_endpoint: IfBlock,
pub asn_geo_lookup: AsnGeoLookupConfig,
}
#[derive(Clone)]
@ -30,6 +33,31 @@ pub struct ContactForm {
pub field_honey_pot: Option<String>,
}
#[derive(Clone)]
pub enum AsnGeoLookupConfig {
Resource {
expires: Duration,
timeout: Duration,
max_size: usize,
resources: Vec<AsnGeoLookupResource>,
},
Dns {
zone_ipv4: String,
zone_ipv6: String,
separator: String,
index_asn: usize,
index_asn_name: Option<usize>,
index_country: Option<usize>,
},
Disabled,
}
#[derive(Clone)]
pub enum AsnGeoLookupResource {
Asn { url: String, headers: HeaderMap },
Geo { url: String, headers: HeaderMap },
}
#[derive(Clone)]
pub struct FieldOrDefault {
pub field: Option<String>,
@ -62,6 +90,7 @@ impl Default for Network {
"protocol + '://' + key_get('default', 'hostname') + ':' + local_port",
),
http_allowed_endpoint: IfBlock::new::<()>("server.http.allowed-endpoint", [], "200"),
asn_geo_lookup: AsnGeoLookupConfig::Disabled,
}
}
}

View file

@ -54,7 +54,7 @@ pub const THROTTLE_HELO_DOMAIN: u16 = 1 << 9;
pub(crate) const RCPT_DOMAIN_VARS: &[u32; 1] = &[V_RECIPIENT_DOMAIN];
pub(crate) const SMTP_EHLO_VARS: &[u32; 8] = &[
pub(crate) const SMTP_EHLO_VARS: &[u32; 10] = &[
V_LISTENER,
V_REMOTE_IP,
V_REMOTE_PORT,
@ -63,8 +63,10 @@ pub(crate) const SMTP_EHLO_VARS: &[u32; 8] = &[
V_PROTOCOL,
V_TLS,
V_HELO_DOMAIN,
V_ASN,
V_COUNTRY,
];
pub(crate) const SMTP_MAIL_FROM_VARS: &[u32; 10] = &[
pub(crate) const SMTP_MAIL_FROM_VARS: &[u32; 12] = &[
V_LISTENER,
V_REMOTE_IP,
V_REMOTE_PORT,
@ -75,8 +77,10 @@ pub(crate) const SMTP_MAIL_FROM_VARS: &[u32; 10] = &[
V_SENDER,
V_SENDER_DOMAIN,
V_AUTHENTICATED_AS,
V_ASN,
V_COUNTRY,
];
pub(crate) const SMTP_RCPT_TO_VARS: &[u32; 15] = &[
pub(crate) const SMTP_RCPT_TO_VARS: &[u32; 17] = &[
V_SENDER,
V_SENDER_DOMAIN,
V_RECIPIENTS,
@ -92,6 +96,8 @@ pub(crate) const SMTP_RCPT_TO_VARS: &[u32; 15] = &[
V_TLS,
V_PRIORITY,
V_HELO_DOMAIN,
V_ASN,
V_COUNTRY,
];
pub(crate) const SMTP_QUEUE_HOST_VARS: &[u32; 14] = &[
V_SENDER,

View file

@ -7,7 +7,6 @@
use std::{net::SocketAddr, time::Duration};
use ahash::AHashSet;
use hyper::HeaderMap;
use mail_parser::HeaderName;
use utils::{
config::Config,
@ -24,9 +23,8 @@ pub struct SpamFilterConfig {
pub max_rbl_url_checks: usize,
pub greylist_duration: Option<Duration>,
pub pyzor: Option<PyzorConfig>,
pub asn: AsnLookupProvider,
pub reputation: Option<ReputationConfig>,
pub list_dmarc_allow: GlobSet,
pub list_spf_dkim_allow: GlobSet,
@ -41,23 +39,14 @@ pub struct SpamFilterConfig {
}
#[derive(Debug, Clone, Default)]
pub enum AsnLookupProvider {
Dns {
ipv4_zone: String,
ipv6_zone: String,
separator: char,
asn_index: usize,
country_index: Option<usize>,
},
Rest {
api: String,
timeout: Duration,
headers: HeaderMap,
asn_path: Vec<String>,
country_path: Option<Vec<String>>,
},
#[default]
None,
pub struct ReputationConfig {
pub expiry: u64,
pub token_score: f64,
pub factor: f64,
pub ip_weight: f64,
pub domain_weight: f64,
pub asn_weight: f64,
pub sender_weight: f64,
}
#[derive(Debug, Clone)]

View file

@ -197,12 +197,17 @@ impl LicenseKey {
Details = "Attempting to renew Enterprise license from license.stalw.art",
);
match fetch_resource(&format!("{}{}", LICENSING_API, self.domain), headers.into())
.await
.and_then(|bytes| {
String::from_utf8(bytes)
.map_err(|_| String::from("Failed to UTF-8 decode server response"))
}) {
match fetch_resource(
&format!("{}{}", LICENSING_API, self.domain),
headers.into(),
Duration::from_secs(60),
1024,
)
.await
.and_then(|bytes| {
String::from_utf8(bytes)
.map_err(|_| String::from("Failed to UTF-8 decode server response"))
}) {
Ok(encoded_key) => match LicenseKey::new(&encoded_key, &self.domain) {
Ok(key) => Ok(RenewedLicense { key, encoded_key }),
Err(err) => {

View file

@ -35,6 +35,8 @@ pub const V_URL: u32 = 21;
pub const V_URL_PATH: u32 = 22;
pub const V_HEADERS: u32 = 23;
pub const V_METHOD: u32 = 24;
pub const V_ASN: u32 = 25;
pub const V_COUNTRY: u32 = 26;
pub const VARIABLES_MAP: &[(&str, u32)] = &[
("rcpt", V_RECIPIENT),
@ -62,6 +64,8 @@ pub const VARIABLES_MAP: &[(&str, u32)] = &[
("url_path", V_URL_PATH),
("headers", V_HEADERS),
("method", V_METHOD),
("asn", V_ASN),
("country", V_COUNTRY),
];
use regex::Regex;

View file

@ -360,6 +360,8 @@ impl TokenMap {
V_QUEUE_EXPIRES_IN,
V_QUEUE_LAST_STATUS,
V_QUEUE_LAST_ERROR,
V_ASN,
V_COUNTRY,
])
}

View file

@ -29,7 +29,9 @@ use dashmap::DashMap;
use futures::StreamExt;
use imap_proto::protocol::list::Attribute;
use ipc::{DeliveryEvent, HousekeeperEvent, QueueEvent, ReportingEvent, StateEvent};
use listener::{blocked::Security, limiter::ConcurrencyLimiter, tls::AcmeProviders};
use listener::{
asn::AsnGeoLookupData, blocked::Security, limiter::ConcurrencyLimiter, tls::AcmeProviders,
};
use manager::webadmin::{Resource, WebAdminManager};
use nlp::bayes::cache::BayesTokenCache;
@ -92,6 +94,7 @@ pub struct Data {
pub bayes_cache: BayesTokenCache,
pub remote_lists: RwLock<AHashMap<String, RemoteList>>,
pub asn_geo_data: AsnGeoLookupData,
pub jmap_id_gen: SnowflakeIdGenerator,
pub queue_id_gen: SnowflakeIdGenerator,

View file

@ -0,0 +1,403 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{
net::IpAddr,
sync::{atomic::AtomicU64, Arc},
time::{Duration, Instant},
};
use ahash::AHashMap;
use arc_swap::ArcSwap;
use mail_auth::common::resolver::ToReverseName;
use store::write::now;
use tokio::sync::Semaphore;
use crate::{
config::network::{AsnGeoLookupConfig, AsnGeoLookupResource},
manager::fetch_resource,
Server,
};
pub struct AsnGeoLookupData {
pub lock: Semaphore,
expires: AtomicU64,
asn: ArcSwap<Data<Arc<AsnData>>>,
country: ArcSwap<Data<Arc<String>>>,
}
#[derive(Clone, Default, Debug)]
pub struct AsnData {
pub id: u32,
pub name: Option<String>,
}
#[derive(Clone, Default, Debug)]
pub struct AsnGeoLookupResult {
pub asn: Option<Arc<AsnData>>,
pub country: Option<Arc<String>>,
}
struct Data<T> {
ip4_ranges: Vec<IpRange<u32, T>>,
ip6_ranges: Vec<IpRange<u128, T>>,
}
pub struct IpRange<I: Ord, T> {
pub start: I,
pub end: I,
pub data: T,
}
impl Server {
pub async fn lookup_asn_country(&self, ip: IpAddr) -> AsnGeoLookupResult {
let mut result = AsnGeoLookupResult::default();
match &self.core.network.asn_geo_lookup {
AsnGeoLookupConfig::Resource { .. } if !ip.is_loopback() => {
let asn_geo = &self.inner.data.asn_geo_data;
if asn_geo.expires.load(std::sync::atomic::Ordering::Relaxed) <= now()
&& asn_geo.lock.available_permits() > 0
{
self.refresh_asn_geo_tables();
}
result.asn = asn_geo.asn.load().lookup(ip).cloned();
result.country = asn_geo.country.load().lookup(ip).cloned();
}
AsnGeoLookupConfig::Dns {
zone_ipv4,
zone_ipv6,
separator,
index_asn,
index_asn_name,
index_country,
} if !ip.is_loopback() => {
let zone = if ip.is_ipv4() { zone_ipv4 } else { zone_ipv6 };
match self
.core
.smtp
.resolvers
.dns
.txt_raw_lookup(format!("{}.{}.", ip.to_reverse_name(), zone))
.await
.map(String::from_utf8)
{
Ok(Ok(entry)) => {
let mut asn = None;
let mut asn_name = None;
let mut country = None;
for (idx, part) in entry.split(separator).enumerate() {
let part = part.trim();
if !part.is_empty() {
if idx == *index_asn {
asn = part.parse::<u32>().ok();
} else if index_asn_name.map_or(false, |i| i == idx) {
asn_name = Some(part.to_string());
} else if index_country.map_or(false, |i| i == idx) {
country = Some(part.to_string());
}
}
}
if let Some(asn) = asn {
result.asn = Some(Arc::new(AsnData {
id: asn,
name: asn_name,
}));
}
if let Some(country) = country {
result.country = Some(Arc::new(country));
}
}
Ok(Err(_)) => {
trc::event!(
Resource(trc::ResourceEvent::Error),
Details = "Failed to UTF-8 decode ASN/Geo data",
Hostname = format!("{}.{}.", ip.to_reverse_name(), zone),
);
}
Err(err) => {
trc::event!(
Resource(trc::ResourceEvent::Error),
Details = "Failed to lookup ASN/Geo data",
Hostname = format!("{}.{}.", ip.to_reverse_name(), zone),
CausedBy = err.to_string()
);
}
}
}
_ => (),
}
result
}
fn refresh_asn_geo_tables(&self) {
let server = self.clone();
tokio::spawn(async move {
let asn_geo = &server.inner.data.asn_geo_data;
let _permit = asn_geo.lock.acquire().await;
if asn_geo.expires.load(std::sync::atomic::Ordering::Relaxed) > now() {
return;
}
if let AsnGeoLookupConfig::Resource {
expires,
timeout,
max_size,
resources,
} = &server.core.network.asn_geo_lookup
{
let mut asn_data = Data::new();
let mut country_data = Data::new();
for lookup in resources {
let (url, headers, is_asn) = match lookup {
AsnGeoLookupResource::Asn { url, headers } => (url, headers, true),
AsnGeoLookupResource::Geo { url, headers } => (url, headers, false),
};
let time = Instant::now();
match fetch_resource(url, headers.clone().into(), *timeout, *max_size)
.await
.map(String::from_utf8)
{
Ok(Ok(data)) => {
let mut has_errors = false;
let mut asn_mappings = AHashMap::new();
let mut geo_mappings = AHashMap::new();
let mut from_ip = None;
let mut to_ip = None;
let mut asn = None;
let mut details = None;
let mut in_quote = false;
let mut col_num = 0;
let mut col_start = 0;
let mut line_start = 0;
for (idx, ch) in data.char_indices() {
match ch {
'"' => in_quote = !in_quote,
',' | '\n' if !in_quote => {
let column =
data.get(col_start..idx).unwrap_or_default().trim();
match col_num {
0 => from_ip = column.parse::<IpAddr>().ok(),
1 => to_ip = column.parse::<IpAddr>().ok(),
2 if is_asn => asn = column.parse::<u32>().ok(),
2 | 3 => {
let column = column
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.unwrap_or(column);
if !column.is_empty() || details.is_none() {
details = Some(column);
}
}
_ => break,
}
if ch == '\n' {
let is_success = match (from_ip, to_ip, asn, details) {
(
Some(from_ip),
Some(to_ip),
Some(asn),
asn_name,
) if is_asn => {
let data = asn_mappings
.entry(asn)
.or_insert_with(|| {
Arc::new(AsnData {
id: asn,
name: asn_name.map(String::from),
})
})
.clone();
asn_data.insert(from_ip, to_ip, data)
}
(Some(from_ip), Some(to_ip), _, Some(code))
if !is_asn && [2, 3].contains(&code.len()) =>
{
let code = code.to_uppercase();
let data = geo_mappings
.entry(code.clone())
.or_insert_with(|| Arc::new(code))
.clone();
country_data.insert(from_ip, to_ip, data)
}
(None, None, _, _) => true, // Ignore empty rows
_ => false,
};
if !is_success && !has_errors {
trc::event!(
Resource(trc::ResourceEvent::Error),
Details = "Invalid ASN/Geo data",
Url = url.clone(),
Details = data
.get(line_start..idx)
.unwrap_or_default()
.to_string(),
);
has_errors = true;
}
col_num = 0;
from_ip = None;
to_ip = None;
asn = None;
details = None;
line_start = idx + 1;
} else {
col_num += 1;
}
col_start = idx + 1;
}
_ => {}
}
}
trc::event!(
Resource(trc::ResourceEvent::DownloadExternal),
Details = "Downloaded ASN/Geo data",
Url = url.clone(),
Elapsed = time.elapsed()
);
}
Ok(Err(_)) => {
trc::event!(
Resource(trc::ResourceEvent::Error),
Details = "Failed to UTF-8 decode ASN/Geo data",
Url = url.clone(),
);
}
Err(err) => {
trc::event!(
Resource(trc::ResourceEvent::Error),
Details = "Failed to download ASN/Geo data",
Url = url.clone(),
CausedBy = err
);
}
}
}
let expires = if !asn_data.is_empty() || !country_data.is_empty() {
*expires
} else {
Duration::from_secs(60)
};
if !asn_data.is_empty() {
asn_geo.asn.store(Arc::new(asn_data.sorted()));
}
if !country_data.is_empty() {
asn_geo.country.store(Arc::new(country_data.sorted()));
}
asn_geo.expires.store(
now() + expires.as_secs(),
std::sync::atomic::Ordering::Relaxed,
);
}
});
}
}
impl<T> Data<T> {
fn new() -> Self {
Self {
ip4_ranges: Vec::new(),
ip6_ranges: Vec::new(),
}
}
pub fn lookup(&self, ip: IpAddr) -> Option<&T> {
match ip {
IpAddr::V4(ip) => {
let ip = u32::from(ip);
match self.ip4_ranges.binary_search_by(|range| {
if ip < range.start {
std::cmp::Ordering::Greater
} else if ip > range.end {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Equal
}
}) {
Ok(idx) => Some(&self.ip4_ranges[idx].data),
Err(_) => None,
}
}
IpAddr::V6(ip) => {
let ip = u128::from(ip);
match self.ip6_ranges.binary_search_by(|range| {
if ip < range.start {
std::cmp::Ordering::Greater
} else if ip > range.end {
std::cmp::Ordering::Less
} else {
std::cmp::Ordering::Equal
}
}) {
Ok(idx) => Some(&self.ip6_ranges[idx].data),
Err(_) => None,
}
}
}
}
pub fn insert(&mut self, from_ip: IpAddr, to_ip: IpAddr, data: T) -> bool {
match (from_ip, to_ip) {
(IpAddr::V4(from), IpAddr::V4(to)) => {
self.ip4_ranges.push(IpRange {
start: u32::from(from),
end: u32::from(to),
data,
});
true
}
(IpAddr::V6(from), IpAddr::V6(to)) => {
self.ip6_ranges.push(IpRange {
start: u128::from(from),
end: u128::from(to),
data,
});
true
}
_ => false,
}
}
pub fn sorted(mut self) -> Self {
self.ip4_ranges.sort_unstable_by_key(|range| range.start);
self.ip6_ranges.sort_unstable_by_key(|range| range.start);
self
}
pub fn is_empty(&self) -> bool {
self.ip4_ranges.is_empty() && self.ip6_ranges.is_empty()
}
}
impl Default for AsnGeoLookupData {
fn default() -> Self {
Self {
lock: Semaphore::new(1),
expires: AtomicU64::new(0),
asn: ArcSwap::new(Arc::new(Data::new())),
country: ArcSwap::new(Arc::new(Data::new())),
}
}
}

View file

@ -25,6 +25,7 @@ use crate::{
use self::limiter::{ConcurrencyLimiter, InFlight};
pub mod acme;
pub mod asn;
pub mod blocked;
pub mod limiter;
pub mod listen;

View file

@ -8,7 +8,7 @@ use std::time::Duration;
use hyper::HeaderMap;
use crate::USER_AGENT;
use crate::{HttpLimitResponse, USER_AGENT};
use self::config::ConfigManager;
@ -34,25 +34,48 @@ impl ConfigManager {
format!("Failed to fetch configuration key 'resource.{resource_id}': {err}",)
})?
{
fetch_resource(&url, None).await
fetch_resource(&url, None, Duration::from_secs(60), MAX_SIZE).await
} else {
match resource_id {
"spam-filter" => fetch_resource(DEFAULT_SPAMFILTER_URL, None).await,
"webadmin" => fetch_resource(DEFAULT_WEBADMIN_URL, None).await,
"spam-filter" => {
fetch_resource(
DEFAULT_SPAMFILTER_URL,
None,
Duration::from_secs(60),
MAX_SIZE,
)
.await
}
"webadmin" => {
fetch_resource(
DEFAULT_WEBADMIN_URL,
None,
Duration::from_secs(60),
MAX_SIZE,
)
.await
}
_ => Err(format!("Unknown resource: {resource_id}")),
}
}
}
}
pub async fn fetch_resource(url: &str, headers: Option<HeaderMap>) -> Result<Vec<u8>, String> {
const MAX_SIZE: usize = 100 * 1024 * 1024;
pub async fn fetch_resource(
url: &str,
headers: Option<HeaderMap>,
timeout: Duration,
max_size: usize,
) -> Result<Vec<u8>, String> {
if let Some(path) = url.strip_prefix("file://") {
tokio::fs::read(path)
.await
.map_err(|err| format!("Failed to read {path}: {err}"))
} else {
let response = reqwest::Client::builder()
.timeout(Duration::from_secs(60))
.timeout(timeout)
.danger_accept_invalid_certs(is_localhost_url(url))
.user_agent(USER_AGENT)
.build()
@ -65,10 +88,10 @@ pub async fn fetch_resource(url: &str, headers: Option<HeaderMap>) -> Result<Vec
if response.status().is_success() {
response
.bytes()
.bytes_with_limit(max_size)
.await
.map_err(|err| format!("Failed to fetch {url}: {err}"))
.map(|bytes| bytes.to_vec())
.and_then(|bytes| bytes.ok_or_else(|| format!("Resource too large: {url}")))
} else {
let code = response.status().canonical_reason().unwrap_or_default();
let reason = response.text().await.unwrap_or_default();

View file

@ -15,6 +15,7 @@ use common::{
auth::AccessToken,
config::smtp::auth::VerifyStrategy,
listener::{
asn::AsnGeoLookupResult,
limiter::{ConcurrencyLimiter, InFlight},
ServerInstance,
},
@ -77,6 +78,7 @@ pub struct SessionData {
pub remote_ip: IpAddr,
pub remote_ip_str: String,
pub remote_port: u16,
pub asn_geo_data: AsnGeoLookupResult,
pub helo_domain: String,
pub mail_from: Option<SessionAddress>,
@ -148,6 +150,7 @@ impl SessionData {
local_port: u16,
remote_ip: IpAddr,
remote_port: u16,
asn_geo_data: AsnGeoLookupResult,
session_id: u64,
) -> Self {
SessionData {
@ -158,6 +161,7 @@ impl SessionData {
local_ip_str: local_ip.to_string(),
remote_ip_str: remote_ip.to_string(),
remote_port,
asn_geo_data,
helo_domain: String::new(),
mail_from: None,
rcpt_to: Vec::new(),
@ -308,6 +312,7 @@ impl SessionData {
remote_port: 0,
local_port: 0,
session_id,
asn_geo_data: AsnGeoLookupResult::default(),
helo_domain: "localhost".into(),
mail_from,
rcpt_to,

View file

@ -916,7 +916,26 @@ impl<T: SessionStream> Session<T> {
);
headers.extend_from_slice(b" [");
headers.extend_from_slice(self.data.remote_ip.to_string().as_bytes());
headers.extend_from_slice(b"])\r\n\t");
headers.extend_from_slice(b"]");
if self.data.asn_geo_data.asn.is_some() || self.data.asn_geo_data.country.is_some() {
headers.extend_from_slice(b" (");
if let Some(asn) = &self.data.asn_geo_data.asn {
headers.extend_from_slice(b"AS");
headers.extend_from_slice(asn.id.to_string().as_bytes());
if let Some(name) = &asn.name {
headers.extend_from_slice(b" ");
headers.extend_from_slice(name.as_bytes());
}
}
if let Some(country) = &self.data.asn_geo_data.country {
if self.data.asn_geo_data.asn.is_some() {
headers.extend_from_slice(b", ");
}
headers.extend_from_slice(country.as_bytes());
}
headers.extend_from_slice(b")");
}
headers.extend_from_slice(b")\r\n\t");
if self.stream.is_tls() {
let (version, cipher) = self.stream.tls_version_and_cipher();
headers.extend_from_slice(b"(using ");

View file

@ -578,6 +578,22 @@ impl<T: SessionStream> ResolveVariable for Session<T> {
V_TLS => self.stream.is_tls().into(),
V_PRIORITY => self.data.priority.to_string().into(),
V_PROTOCOL => self.instance.protocol.as_str().into(),
V_ASN => self
.data
.asn_geo_data
.asn
.as_ref()
.map(|a| a.id)
.unwrap_or_default()
.into(),
V_COUNTRY => self
.data
.asn_geo_data
.country
.as_ref()
.map(|c| c.as_str())
.unwrap_or_default()
.into(),
_ => expr::Variable::default(),
}
}

View file

@ -20,38 +20,35 @@ use crate::{
};
impl SessionManager for SmtpSessionManager {
fn handle<T: SessionStream>(
self,
session: listener::SessionData<T>,
) -> impl std::future::Future<Output = ()> + Send {
// Create session
async fn handle<T: SessionStream>(self, session: listener::SessionData<T>) {
// Build server and create session
let server = self.inner.build_server();
let mut session = Session {
hostname: String::new(),
server: self.inner.build_server(),
instance: session.instance,
state: State::default(),
stream: session.stream,
in_flight: vec![session.in_flight],
data: SessionData::new(
session.local_ip,
session.local_port,
session.remote_ip,
session.remote_port,
server.lookup_asn_country(session.remote_ip).await,
session.session_id,
),
hostname: String::new(),
server,
instance: session.instance,
state: State::default(),
stream: session.stream,
in_flight: vec![session.in_flight],
params: SessionParameters::default(),
};
// Enforce throttle
async {
if session.is_allowed().await
&& session.init_conn().await
&& session.handle_conn().await
&& session.instance.acceptor.is_tls()
{
if let Ok(mut session) = session.into_tls().await {
session.handle_conn().await;
}
if session.is_allowed().await
&& session.init_conn().await
&& session.handle_conn().await
&& session.instance.acceptor.is_tls()
{
if let Ok(mut session) = session.into_tls().await {
session.handle_conn().await;
}
}
}

View file

@ -32,6 +32,24 @@ impl<T: SessionStream> Session<T> {
.duration_since(SystemTime::UNIX_EPOCH)
.map_or(0, |d| d.as_secs()),
)
.set_variable(
"asn",
self.data
.asn_geo_data
.asn
.as_ref()
.map(|r| r.id)
.unwrap_or_default(),
)
.set_variable(
"country",
self.data
.asn_geo_data
.country
.as_ref()
.map(|r| r.as_str())
.unwrap_or_default(),
)
.set_variable(
"spf.result",
self.data

View file

@ -28,6 +28,7 @@ pub mod pyzor;
pub mod received;
pub mod recipient;
pub mod replyto;
pub mod reputation;
pub mod subject;
pub mod url;

View file

@ -0,0 +1,160 @@
use std::future::Future;
use common::Server;
use mail_auth::DmarcResult;
use store::{Deserialize, Serialize};
use crate::{
modules::{key_get, key_set},
SpamFilterContext,
};
pub trait SpamFilterAnalyzeReputation: Sync + Send {
fn spam_filter_analyze_reputation(
&self,
ctx: &mut SpamFilterContext<'_>,
) -> impl Future<Output = ()> + Send;
}
enum Type {
Ip,
From,
Domain,
Asn,
}
#[derive(Debug)]
struct Reputation {
count: u32,
score: f64,
}
impl SpamFilterAnalyzeReputation for Server {
async fn spam_filter_analyze_reputation(&self, ctx: &mut SpamFilterContext<'_>) {
// Obtain sender address
let sender = if !ctx.output.env_from_addr.address.is_empty() {
&ctx.output.env_from_addr
} else {
&ctx.output.from.email
};
// Do not penalize forged domains
let prefix = if matches!(ctx.input.dmarc_result, DmarcResult::Pass) {
""
} else {
"_"
};
let mut types = vec![
(Type::Ip, format!("i:{}", ctx.input.remote_ip)),
(Type::From, format!("f:{}{}", prefix, sender.address)),
(
Type::Domain,
format!("d:{}{}", prefix, sender.domain_part.sld_or_default()),
),
];
// Add ASN
if let Some(asn_id) = &ctx.input.asn {
ctx.result.add_tag(format!("SOURCE_ASN_{asn_id}"));
types.push((Type::Asn, format!("a:{asn_id}")));
}
if let Some(country) = &ctx.input.country {
ctx.result.add_tag(format!("SOURCE_COUNTRY_{country}"));
}
if let Some(config) = &self.core.spam.reputation {
let mut reputation = 0.0;
for (rep_type, key) in types {
let key = key.into_bytes();
let mut token =
match key_get::<Reputation>(self, ctx.input.span_id, key.clone()).await {
Ok(Some(token)) => token,
Ok(None) if !ctx.input.is_test => {
key_set(
self,
ctx.input.span_id,
key,
Reputation {
count: 1,
score: ctx.result.score,
}
.serialize(),
config.expiry.into(),
)
.await;
continue;
}
_ => continue,
};
// Update reputation
token.score = (token.count + 1) as f64
* (ctx.result.score + config.token_score * token.score)
/ (config.token_score * token.count as f64 + 1.0);
token.count += 1;
if !ctx.input.is_test {
key_set(
self,
ctx.input.span_id,
key,
token.serialize(),
config.expiry.into(),
)
.await;
}
// Assign weight
let weight = match rep_type {
Type::Ip => config.ip_weight,
Type::From => config.sender_weight,
Type::Domain => config.domain_weight,
Type::Asn => config.asn_weight,
};
reputation += token.score / token.count as f64 * weight;
}
// Adjust score
if reputation > 0.0 {
ctx.result.score += (reputation - ctx.result.score) * config.factor;
}
}
}
}
impl Serialize for &Reputation {
fn serialize(self) -> Vec<u8> {
let mut buf = Vec::with_capacity(12);
buf.extend_from_slice(&self.count.to_be_bytes());
buf.extend_from_slice(&self.score.to_be_bytes());
buf
}
}
impl Deserialize for Reputation {
fn deserialize(bytes: &[u8]) -> trc::Result<Self> {
if bytes.len() == 12 {
Ok(Reputation {
count: u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]),
score: f64::from_be_bytes([
bytes[4], bytes[5], bytes[6], bytes[7], bytes[8], bytes[9], bytes[10],
bytes[11],
]),
})
} else {
Err(trc::StoreEvent::DataCorruption
.caused_by(trc::location!())
.ctx(trc::Key::Value, bytes))
}
}
}
impl From<store::Value<'_>> for Reputation {
fn from(_: store::Value<'_>) -> Self {
unimplemented!()
}
}

View file

@ -28,6 +28,8 @@ pub struct SpamFilterInput<'x> {
pub remote_ip: IpAddr,
pub ehlo_domain: &'x str,
pub authenticated_as: &'x str,
pub asn: Option<u32>,
pub country: Option<&'x str>,
// TLS
pub tls_version: &'x str,
@ -37,6 +39,8 @@ pub struct SpamFilterInput<'x> {
pub env_from: &'x str,
pub env_from_flags: u64,
pub env_rcpt_to: &'x [&'x str],
pub is_test: bool,
}
pub struct SpamFilterOutput<'x> {
@ -75,6 +79,7 @@ pub enum TextPart<'x> {
#[derive(Debug, Default)]
pub struct SpamFilterResult {
pub tags: AHashSet<String>,
pub score: f64,
pub rbl_ip_checks: usize,
pub rbl_domain_checks: usize,
pub rbl_url_checks: usize,

View file

@ -1,5 +1,34 @@
use common::Server;
use store::{Deserialize, Value};
pub mod dnsbl;
pub mod html;
pub mod pyzor;
pub mod remote_list;
pub mod sanitize;
pub(crate) async fn key_get<T: Deserialize + From<Value<'static>> + std::fmt::Debug + 'static>(
server: &Server,
span_id: u64,
key: impl Into<Vec<u8>>,
) -> Result<Option<T>, ()> {
server
.lookup_store()
.key_get(key.into())
.await
.map_err(|err| {
trc::error!(err.span_id(span_id).caused_by(trc::location!()));
})
}
pub(crate) async fn key_set(
server: &Server,
span_id: u64,
key: Vec<u8>,
value: Vec<u8>,
expires: Option<u64>,
) {
if let Err(err) = server.lookup_store().key_set(key, value, expires).await {
trc::error!(err.span_id(span_id).caused_by(trc::location!()));
}
}

View file

@ -1,78 +0,0 @@
# Obtain sender address and domain
let "rep_from" "envelope.from";
let "rep_from_domain" "envfrom_domain_sld";
if eval "is_empty(rep_from)" {
let "rep_from" "from_addr";
let "rep_from_domain" "from_domain_sld";
}
if eval "env.dmarc.result != 'pass'" {
# Do not penalize forged domains
let "rep_from" "'_' + rep_from";
let "rep_from_domain" "'_' + rep_from_domain";
}
# Lookup ASN
let "asn" "";
if eval "len(env.remote_ip.reverse) <= 15" {
let "asn" "env.remote_ip.reverse + '.origin.asn.cymru.com'";
} else {
let "asn" "env.remote_ip.reverse + '.origin.asn6.cymru.com'";
}
let "asn" "split(dns_query(asn, 'txt'), '|')[0]";
# Generate reputation tokens
let "token_ids" "";
if eval "asn > 0" {
let "token_ids" "['i:' + env.remote_ip, 'f:' + rep_from, 'd:' + rep_from_domain, 'a:' + asn ]";
} else {
let "token_ids" "['i:' + env.remote_ip, 'f:' + rep_from, 'd:' + rep_from_domain ]";
}
# Lookup reputation
let "i" "len(token_ids)";
let "reputation" "0.0";
while "i > 0" {
let "i" "i - 1";
let "token_id" "token_ids[i]";
# Lookup reputation
let "token_rep" "key_get(SPAM_DB, token_id)";
if eval "is_empty(token_rep)" {
# Set reputation
eval "!env.test && key_set(SPAM_DB, token_id, [score, 1], 2592000)";
continue;
}
# Update reputation
let "token_score" "token_rep[0]";
let "token_count" "token_rep[1]";
let "updated_score" "(token_count + 1) * (score + 0.98 * token_score) / (0.98 * token_count + 1)";
eval "!env.test && key_set(SPAM_DB, token_id, [updated_score, token_count + 1], 2592000)";
# Assign weight
let "weight" "";
if eval "starts_with(token_id, 'f:')" {
# Sender address has 50% weight
let "weight" "0.5";
} elsif eval "starts_with(token_id, 'd:')" {
# Sender domain has 20% weight
let "weight" "0.2";
} elsif eval "starts_with(token_id, 'i:')" {
# IP has 20% weight
let "weight" "0.2";
} elsif eval "starts_with(token_id, 'a:')" {
# ASN has 10% weight
let "weight" "0.1";
} else {
continue;
}
let "reputation" "reputation + (token_score / token_count * weight)";
}
# Adjust score using a 0.5 factor
if eval "reputation > 0" {
let "score" "score + (reputation - score) * 0.5";
}

View file

@ -0,0 +1,116 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
#[cfg(test)]
mod tests {
use std::time::{Duration, Instant};
use common::{
config::network::{AsnGeoLookupConfig, AsnGeoLookupResource},
Core, Server,
};
#[tokio::test]
#[ignore]
async fn lookup_asn_country_dns() {
let mut core = Core::default();
core.network.asn_geo_lookup = AsnGeoLookupConfig::Dns {
zone_ipv4: "origin.asn.cymru.com".to_string(),
zone_ipv6: "origin6.asn.cymru.com".to_string(),
separator: '|'.to_string(),
index_asn: 0,
index_asn_name: 3.into(),
index_country: 2.into(),
};
let server = Server {
core: core.into(),
inner: Default::default(),
};
for (ip, asn, asn_name, country) in [
("8.8.8.8", 15169, "arin", "US"),
("1.1.1.1", 13335, "apnic", "AU"),
("2a01:4f9:c011:b43c::1", 24940, "ripencc", "DE"),
("1.33.1.1", 2514, "apnic", "JP"),
] {
let result = server.lookup_asn_country(ip.parse().unwrap()).await;
println!("{ip}: {result:?}");
assert_eq!(result.asn.as_ref().map(|r| r.id), Some(asn));
assert_eq!(
result.asn.as_ref().and_then(|r| r.name.as_deref()),
Some(asn_name)
);
assert_eq!(result.country.as_ref().map(|s| s.as_str()), Some(country));
}
}
#[tokio::test]
#[ignore]
async fn lookup_asn_country_http() {
let mut core = Core::default();
core.network.asn_geo_lookup = AsnGeoLookupConfig::Resource {
expires: Duration::from_secs(86400),
timeout: Duration::from_secs(100),
max_size: 100 * 1024 * 1024,
resources: vec![
AsnGeoLookupResource::Asn {
//url: "file:///Users/me/code/playground/asn-ipv4.csv".to_string(),
url: "https://cdn.jsdelivr.net/npm/@ip-location-db/asn/asn-ipv4.csv".to_string(),
headers: Default::default(),
},
AsnGeoLookupResource::Asn {
//url: "file:///Users/me/code/playground/asn-ipv6.csv".to_string(),
url: "https://cdn.jsdelivr.net/npm/@ip-location-db/asn/asn-ipv6.csv".to_string(),
headers: Default::default(),
},
AsnGeoLookupResource::Geo {
//url: "file:///Users/me/code/playground/geolite2-geo-whois-asn-country-ipv4.csv"
// .to_string(),
url: "https://cdn.jsdelivr.net/npm/@ip-location-db/geolite2-geo-whois-asn-country/geolite2-geo-whois-asn-country-ipv4.csv"
.to_string(),
headers: Default::default(),
},
AsnGeoLookupResource::Geo {
//url: "file:///Users/me/code/playground/geolite2-geo-whois-asn-country-ipv6.csv"
// .to_string(),
url: "https://cdn.jsdelivr.net/npm/@ip-location-db/geolite2-geo-whois-asn-country/geolite2-geo-whois-asn-country-ipv4.csv"
.to_string(),
headers: Default::default(),
},
],
};
let server = Server {
core: core.into(),
inner: Default::default(),
};
server.lookup_asn_country("8.8.8.8".parse().unwrap()).await;
let time = Instant::now();
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
if server.inner.data.asn_geo_data.lock.available_permits() > 0 {
break;
}
}
println!("Fetch took {:?}", time.elapsed());
for (ip, asn, asn_name, country) in [
("8.8.8.8", 15169, "Google LLC", "US"),
("1.1.1.1", 13335, "Cloudflare, Inc.", "AU"),
("2a01:4f9:c011:b43c::1", 24940, "Hetzner Online GmbH", "FI"),
("1.33.1.1", 2514, "NTT PC Communications, Inc.", "JP"),
] {
let result = server.lookup_asn_country(ip.parse().unwrap()).await;
println!("{ip}: {result:?}");
assert_eq!(result.asn.as_ref().map(|r| r.id), Some(asn));
assert_eq!(
result.asn.as_ref().and_then(|r| r.name.as_deref()),
Some(asn_name)
);
assert_eq!(result.country.as_ref().map(|s| s.as_str()), Some(country));
}
}
}

View file

@ -372,6 +372,7 @@ fn milter_address_modifications() {
0,
"127.0.0.1".parse().unwrap(),
0,
Default::default(),
0,
);
@ -478,6 +479,7 @@ fn milter_message_modifications() {
0,
"127.0.0.1".parse().unwrap(),
0,
Default::default(),
0,
);

View file

@ -21,6 +21,7 @@ use smtp::queue::{DeliveryAttempt, Message, QueueId};
use super::{QueueReceiver, ReportReceiver};
pub mod antispam;
pub mod asn;
pub mod auth;
pub mod basic;
pub mod data;

View file

@ -112,6 +112,7 @@ impl TestSession for Session<DummyIo> {
0,
"127.0.0.1".parse().unwrap(),
0,
Default::default(),
0,
),
params: SessionParameters::default(),