Directory implementation - part 1

This commit is contained in:
Mauro D 2023-05-30 17:27:54 +00:00
parent ab96cf0b6e
commit 5f76a1672d
19 changed files with 1505 additions and 2 deletions

89
Cargo.lock generated
View file

@ -302,6 +302,19 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bb8"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1627eccf3aa91405435ba240be23513eeca466b5dc33866422672264de061582"
dependencies = [
"async-trait",
"futures-channel",
"futures-util",
"parking_lot",
"tokio",
]
[[package]]
name = "bincode"
version = "1.3.3"
@ -783,6 +796,23 @@ dependencies = [
"subtle",
]
[[package]]
name = "directory"
version = "0.1.0"
dependencies = [
"ahash 0.8.3",
"async-trait",
"bb8",
"ldap3",
"mail-send",
"rustls 0.21.1",
"smtp-proto",
"sqlx",
"tokio",
"tokio-rustls 0.24.0",
"tracing",
]
[[package]]
name = "displaydoc"
version = "0.2.4"
@ -1664,6 +1694,7 @@ dependencies = [
"base64 0.21.1",
"bincode",
"chrono",
"directory",
"ece",
"form-data",
"form_urlencoded",
@ -1760,6 +1791,43 @@ version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"
[[package]]
name = "lber"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5d85f5e00e12cb50c70c3b1c1f0daff6546eb4c608b44d0a990e38a539e0446"
dependencies = [
"bytes",
"nom",
]
[[package]]
name = "ldap3"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c5cfbd3c59ca16d6671b002b8b3dd013cd825d9c77a1664a3135194d3270511e"
dependencies = [
"async-trait",
"bytes",
"futures",
"futures-util",
"lazy_static",
"lber",
"log",
"nom",
"percent-encoding",
"ring",
"rustls 0.20.8",
"rustls-native-certs",
"thiserror",
"tokio",
"tokio-rustls 0.23.4",
"tokio-stream",
"tokio-util",
"url",
"x509-parser 0.14.0",
]
[[package]]
name = "libc"
version = "0.2.144"
@ -1929,6 +1997,7 @@ dependencies = [
name = "mail-server"
version = "0.3.0"
dependencies = [
"directory",
"jmap",
"jmap_proto",
"smtp",
@ -3336,7 +3405,7 @@ dependencies = [
"tracing",
"utils",
"webpki-roots 0.23.0",
"x509-parser",
"x509-parser 0.15.0",
]
[[package]]
@ -4711,6 +4780,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "x509-parser"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0ecbeb7b67ce215e40e3cc7f2ff902f94a223acf44995934763467e7b1febc8"
dependencies = [
"asn1-rs",
"base64 0.13.1",
"data-encoding",
"der-parser",
"lazy_static",
"nom",
"oid-registry",
"rusticata-macros",
"thiserror",
"time 0.3.21",
]
[[package]]
name = "x509-parser"
version = "0.15.0"

View file

@ -20,6 +20,7 @@ store = { path = "crates/store" }
jmap = { path = "crates/jmap" }
jmap_proto = { path = "crates/jmap-proto" }
smtp = { path = "crates/smtp", features = ["local_delivery"] }
directory = { path = "crates/directory" }
utils = { path = "crates/utils" }
tokio = { version = "1.23", features = ["full"] }
tracing = "0.1"
@ -35,6 +36,7 @@ members = [
"crates/jmap-proto",
"crates/smtp",
"crates/store",
"crates/directory",
"crates/utils",
"crates/maybe-async",
"tests",

View file

@ -0,0 +1,21 @@
[package]
name = "directory"
version = "0.1.0"
edition = "2021"
resolver = "2"
[dependencies]
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] }
tokio = { version = "1.23", features = ["net"] }
tokio-rustls = { version = "0.24.0"}
rustls = "0.21.0"
sqlx = { version = "0.7.0-alpha.3", features = [ "runtime-tokio-rustls", "postgres", "mysql", "sqlite" ] }
ldap3 = { version = "0.11.1", default-features = false, features = ["tls-rustls"] }
bb8 = "0.8.0"
async-trait = "0.1.68"
ahash = { version = "0.8" }
tracing = "0.1"
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }

View file

@ -0,0 +1,168 @@
use mail_send::Credentials;
use smtp_proto::{
request::{parser::Rfc5321Parser, AUTH},
response::generate::BitToString,
IntoString, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2,
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use super::{ImapClient, ImapError};
impl<T: AsyncRead + AsyncWrite + Unpin> ImapClient<T> {
pub async fn authenticate(
&mut self,
mechanism: u64,
credentials: &Credentials<String>,
) -> Result<(), ImapError> {
if (mechanism & (AUTH_PLAIN | AUTH_XOAUTH2 | AUTH_OAUTHBEARER)) != 0 {
self.write(
format!(
"C3 AUTHENTICATE {} {}\r\n",
mechanism.to_mechanism(),
credentials
.encode(mechanism, "")
.map_err(|err| ImapError::InvalidChallenge(err.to_string()))?
)
.as_bytes(),
)
.await?;
} else {
self.write(format!("C3 AUTHENTICATE {}\r\n", mechanism.to_mechanism()).as_bytes())
.await?;
}
let mut line = self.read_line().await?;
for _ in 0..3 {
if matches!(line.first(), Some(b'+')) {
self.write(
format!(
"{}\r\n",
credentials
.encode(
mechanism,
std::str::from_utf8(line.get(2..).unwrap_or_default())
.unwrap_or_default()
)
.map_err(|err| ImapError::InvalidChallenge(err.to_string()))?
)
.as_bytes(),
)
.await?;
line = self.read_line().await?;
} else if matches!(line.get(..5), Some(b"C3 OK")) {
return Ok(());
} else if matches!(line.get(..5), Some(b"C3 NO"))
|| matches!(line.get(..6), Some(b"C3 BAD"))
{
return Err(ImapError::AuthenticationFailed);
} else {
return Err(ImapError::InvalidResponse(line.into_string()));
}
}
Err(ImapError::InvalidResponse(line.into_string()))
}
pub async fn authentication_mechanisms(&mut self) -> Result<u64, ImapError> {
tokio::time::timeout(self.timeout, async {
self.write(b"C0 CAPABILITY\r\n").await?;
let line = self.read_line().await?;
if !matches!(line.get(..12), Some(b"* CAPABILITY")) {
return Err(ImapError::InvalidResponse(line.into_string()));
}
let mut line_iter = line.iter();
let mut parser = Rfc5321Parser::new(&mut line_iter);
let mut mechanisms = 0;
'outer: while let Ok(ch) = parser.read_char() {
if ch == b' ' {
loop {
if parser.hashed_value().unwrap_or(0) == AUTH && parser.stop_char == b'=' {
if let Ok(Some(mechanism)) = parser.mechanism() {
mechanisms |= mechanism;
}
match parser.stop_char {
b' ' => (),
b'\n' => break 'outer,
_ => break,
}
}
}
} else if ch == b'\n' {
break;
}
}
Ok(mechanisms)
})
.await
.map_err(|_| ImapError::Timeout)?
}
pub async fn noop(&mut self) -> Result<(), ImapError> {
tokio::time::timeout(self.timeout, async {
self.write(b"C8 NOOP\r\n").await?;
self.read_line().await?;
Ok(())
})
.await
.map_err(|_| ImapError::Timeout)?
}
pub async fn logout(&mut self) -> Result<(), ImapError> {
tokio::time::timeout(self.timeout, async {
self.write(b"C9 LOGOUT\r\n").await?;
Ok(())
})
.await
.map_err(|_| ImapError::Timeout)?
}
pub async fn expect_greeting(&mut self) -> Result<(), ImapError> {
tokio::time::timeout(self.timeout, async {
let line = self.read_line().await?;
if matches!(line.get(..4), Some(b"* OK")) {
Ok(())
} else {
Err(ImapError::InvalidResponse(line.into_string()))
}
})
.await
.map_err(|_| ImapError::Timeout)?
}
pub async fn read_line(&mut self) -> Result<Vec<u8>, ImapError> {
let mut buf = vec![0u8; 1024];
let mut buf_extended = Vec::with_capacity(0);
loop {
let br = self.stream.read(&mut buf).await?;
if br > 0 {
if matches!(buf.get(br - 1), Some(b'\n')) {
//println!("{:?}", std::str::from_utf8(&buf[..br]).unwrap());
return Ok(if buf_extended.is_empty() {
buf.truncate(br);
buf
} else {
buf_extended.extend_from_slice(&buf[..br]);
buf_extended
});
} else if buf_extended.is_empty() {
buf_extended = buf[..br].to_vec();
} else {
buf_extended.extend_from_slice(&buf[..br]);
}
} else {
return Err(ImapError::Disconnected);
}
}
}
pub async fn write(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> {
self.stream.write_all(bytes).await?;
self.stream.flush().await
}
}

View file

@ -0,0 +1,86 @@
use mail_send::Credentials;
use smtp_proto::{AUTH_CRAM_MD5, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2};
use crate::{Directory, DirectoryError, Principal};
use super::{ImapDirectory, ImapError};
#[async_trait::async_trait]
impl Directory for ImapDirectory {
async fn authenticate(
&self,
credentials: &Credentials<String>,
) -> crate::Result<Option<Principal>> {
let mut client = self.pool.get().await?;
let mechanism = match credentials {
Credentials::Plain { .. }
if (client.mechanisms & (AUTH_PLAIN | AUTH_LOGIN | AUTH_CRAM_MD5)) != 0 =>
{
if client.mechanisms & AUTH_CRAM_MD5 != 0 {
AUTH_CRAM_MD5
} else if client.mechanisms & AUTH_PLAIN != 0 {
AUTH_PLAIN
} else {
AUTH_LOGIN
}
}
Credentials::OAuthBearer { .. } if client.mechanisms & AUTH_OAUTHBEARER != 0 => {
AUTH_OAUTHBEARER
}
Credentials::XOauth2 { .. } if client.mechanisms & AUTH_XOAUTH2 != 0 => AUTH_XOAUTH2,
_ => {
tracing::warn!(
context = "remote",
event = "error",
protocol = "imap",
"IMAP server does not offer any supported auth mechanisms.",
);
return Ok(None);
}
};
match client.authenticate(mechanism, credentials).await {
Ok(_) => Ok(Some(Principal::default())),
Err(err) => match &err {
ImapError::AuthenticationFailed => Ok(None),
_ => Err(err.into()),
},
}
}
async fn principal_by_name(&self, _name: &str) -> crate::Result<Option<Principal>> {
Err(DirectoryError::unsupported("imap", "principal_by_name"))
}
async fn principal_by_id(&self, _id: u32) -> crate::Result<Option<Principal>> {
Err(DirectoryError::unsupported("imap", "principal_by_id"))
}
async fn member_of(&self, _principal: &Principal) -> crate::Result<Vec<u32>> {
Err(DirectoryError::unsupported("imap", "member_of"))
}
async fn emails_by_id(&self, _id: u32) -> crate::Result<Vec<String>> {
Err(DirectoryError::unsupported("imap", "emails_by_id"))
}
async fn ids_by_email(&self, _address: &str) -> crate::Result<Vec<u32>> {
Err(DirectoryError::unsupported("imap", "ids_by_email"))
}
async fn rcpt(&self, _address: &str) -> crate::Result<bool> {
Err(DirectoryError::unsupported("imap", "rcpt"))
}
async fn vrfy(&self, _address: &str) -> crate::Result<Vec<String>> {
Err(DirectoryError::unsupported("imap", "vrfy"))
}
async fn expn(&self, _address: &str) -> crate::Result<Vec<String>> {
Err(DirectoryError::unsupported("imap", "expn"))
}
async fn query(&self, _query: &str, _params: &[&str]) -> crate::Result<bool> {
Err(DirectoryError::unsupported("imap", "query"))
}
}

View file

@ -0,0 +1,62 @@
pub mod client;
pub mod lookup;
pub mod pool;
pub mod tls;
use std::{fmt::Display, sync::atomic::AtomicU64, time::Duration};
use bb8::Pool;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsConnector;
pub struct ImapDirectory {
pool: Pool<ImapConnectionManager>,
}
pub struct ImapConnectionManager {
addr: String,
timeout: Duration,
tls_connector: TlsConnector,
tls_hostname: String,
tls_implicit: bool,
mechanisms: AtomicU64,
}
pub struct ImapClient<T: AsyncRead + AsyncWrite> {
stream: T,
mechanisms: u64,
timeout: Duration,
}
#[derive(Debug)]
pub enum ImapError {
Io(std::io::Error),
Timeout,
InvalidResponse(String),
InvalidChallenge(String),
AuthenticationFailed,
TLSInvalidName,
Disconnected,
}
impl From<std::io::Error> for ImapError {
fn from(error: std::io::Error) -> Self {
ImapError::Io(error)
}
}
impl Display for ImapError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ImapError::Io(io) => write!(f, "I/O error: {io}"),
ImapError::Timeout => f.write_str("Connection time-out"),
ImapError::InvalidResponse(response) => write!(f, "Unexpected response: {response:?}"),
ImapError::InvalidChallenge(response) => {
write!(f, "Invalid auth challenge: {response}")
}
ImapError::TLSInvalidName => f.write_str("Invalid TLS name"),
ImapError::Disconnected => f.write_str("Connection disconnected by peer"),
ImapError::AuthenticationFailed => f.write_str("Authentication failed"),
}
}
}

View file

@ -0,0 +1,44 @@
use std::sync::atomic::Ordering;
use bb8::ManageConnection;
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
use super::{ImapClient, ImapConnectionManager, ImapError};
#[async_trait::async_trait]
impl ManageConnection for ImapConnectionManager {
type Connection = ImapClient<TlsStream<TcpStream>>;
type Error = ImapError;
/// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let mut conn = ImapClient::connect(
&self.addr,
self.timeout,
&self.tls_connector,
&self.tls_hostname,
self.tls_implicit,
)
.await?;
// Obtain the list of supported authentication mechanisms.
conn.mechanisms = self.mechanisms.load(Ordering::Relaxed);
if conn.mechanisms == 0 {
conn.mechanisms = conn.authentication_mechanisms().await?;
self.mechanisms.store(conn.mechanisms, Ordering::Relaxed);
}
Ok(conn)
}
/// Determines if the connection is still connected to the database.
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
conn.noop().await
}
/// Synchronously determine if the connection is no longer usable, if possible.
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
false
}
}

View file

@ -0,0 +1,84 @@
use std::time::Duration;
use rustls::ServerName;
use smtp_proto::IntoString;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio_rustls::{client::TlsStream, TlsConnector};
use super::{ImapClient, ImapError};
impl ImapClient<TcpStream> {
async fn start_tls(
mut self,
tls_connector: &TlsConnector,
tls_hostname: &str,
) -> Result<ImapClient<TlsStream<TcpStream>>, ImapError> {
let line = tokio::time::timeout(self.timeout, async {
self.write(b"C7 STARTTLS\r\n").await?;
self.read_line().await
})
.await
.map_err(|_| ImapError::Timeout)??;
if matches!(line.get(..5), Some(b"C7 OK")) {
self.into_tls(tls_connector, tls_hostname).await
} else {
Err(ImapError::InvalidResponse(line.into_string()))
}
}
async fn into_tls(
self,
tls_connector: &TlsConnector,
tls_hostname: &str,
) -> Result<ImapClient<TlsStream<TcpStream>>, ImapError> {
tokio::time::timeout(self.timeout, async {
Ok(ImapClient {
stream: tls_connector
.connect(
ServerName::try_from(tls_hostname)
.map_err(|_| ImapError::TLSInvalidName)?,
self.stream,
)
.await?,
timeout: self.timeout,
mechanisms: self.mechanisms,
})
})
.await
.map_err(|_| ImapError::Timeout)?
}
}
impl ImapClient<TlsStream<TcpStream>> {
pub async fn connect(
addr: impl ToSocketAddrs,
timeout: Duration,
tls_connector: &TlsConnector,
tls_hostname: &str,
tls_implicit: bool,
) -> Result<Self, ImapError> {
let mut client: ImapClient<TcpStream> = tokio::time::timeout(timeout, async {
match TcpStream::connect(addr).await {
Ok(stream) => Ok(ImapClient {
stream,
timeout,
mechanisms: 0,
}),
Err(err) => Err(ImapError::Io(err)),
}
})
.await
.map_err(|_| ImapError::Timeout)??;
if tls_implicit {
let mut client = client.into_tls(tls_connector, tls_hostname).await?;
client.expect_greeting().await?;
Ok(client)
} else {
client.expect_greeting().await?;
client.start_tls(tls_connector, tls_hostname).await
}
}
}

View file

View file

@ -0,0 +1,309 @@
use ldap3::{Scope, SearchEntry};
use mail_send::Credentials;
use crate::{Directory, Principal, Type};
use super::{LdapDirectory, LdapMappings};
#[async_trait::async_trait]
impl Directory for LdapDirectory {
async fn authenticate(
&self,
credentials: &Credentials<String>,
) -> crate::Result<Option<Principal>> {
let (username, secret) = match credentials {
Credentials::Plain { username, secret } => (username, secret),
Credentials::OAuthBearer { token } => (token, token),
Credentials::XOauth2 { username, secret } => (username, secret),
};
match self
.find_principal(&self.mappings.filter_login.build(username))
.await
{
Ok(Some(principal)) => {
if principal.secret.as_ref().map_or(false, |s| s == secret) {
Ok(Some(principal))
} else {
Ok(None)
}
}
result => result,
}
}
async fn principal_by_name(&self, name: &str) -> crate::Result<Option<Principal>> {
self.find_principal(&self.mappings.filter_name.build(name))
.await
}
async fn principal_by_id(&self, id: u32) -> crate::Result<Option<Principal>> {
self.find_principal(&self.mappings.filter_id.build(&id.to_string()))
.await
}
async fn member_of(&self, principal: &Principal) -> crate::Result<Vec<u32>> {
if principal.member_of.is_empty() {
return Ok(Vec::new());
}
let mut conn = self.pool.get().await?;
let mut ids = Vec::with_capacity(principal.member_of.len());
for group in &principal.member_of {
let (rs, _res) = if group.contains('=') {
conn.search(group, Scope::Base, "", &self.mappings.attr_id)
.await?
.success()?
} else {
conn.search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_name.build(group),
&self.mappings.attr_id,
)
.await?
.success()?
};
for entry in rs {
for (attr, value) in SearchEntry::construct(entry).attrs {
if self.mappings.attr_id.contains(&attr) {
if let Some(id) = value.first() {
if let Ok(id) = id.parse() {
ids.push(id);
}
}
}
}
}
}
Ok(ids)
}
async fn emails_by_id(&self, id: u32) -> crate::Result<Vec<String>> {
let (rs, _res) = self
.pool
.get()
.await?
.search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_id.build(&id.to_string()),
&self.mappings.attrs_email,
)
.await?
.success()?;
let mut emails = Vec::new();
for entry in rs {
let entry = SearchEntry::construct(entry);
for attr in &self.mappings.attrs_email {
if let Some(values) = entry.attrs.get(attr) {
for email in values {
if !email.is_empty() {
emails.push(email.to_string());
}
}
}
}
}
Ok(emails)
}
async fn ids_by_email(&self, email: &str) -> crate::Result<Vec<u32>> {
let (rs, _res) = self
.pool
.get()
.await?
.search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_email.build(email),
&self.mappings.attr_id,
)
.await?
.success()?;
let mut ids = Vec::new();
for entry in rs {
let entry = SearchEntry::construct(entry);
for attr in &self.mappings.attr_id {
if let Some(values) = entry.attrs.get(attr) {
for id in values {
if let Ok(id) = id.parse() {
ids.push(id);
}
}
}
}
}
Ok(ids)
}
async fn rcpt(&self, address: &str) -> crate::Result<bool> {
let (rs, _res) = self
.pool
.get()
.await?
.search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_email.build(address),
&self.mappings.attr_email_address,
)
.await?
.success()?;
Ok(!rs.is_empty())
}
async fn vrfy(&self, address: &str) -> crate::Result<Vec<String>> {
let mut stream = self
.pool
.get()
.await?
.streaming_search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_verify.build(address),
&self.mappings.attr_email_address,
)
.await?;
let mut emails = Vec::new();
while let Some(entry) = stream.next().await? {
let entry = SearchEntry::construct(entry);
for attr in &self.mappings.attr_email_address {
if let Some(values) = entry.attrs.get(attr) {
for email in values {
if !email.is_empty() {
emails.push(email.to_string());
}
}
}
}
}
Ok(emails)
}
async fn expn(&self, address: &str) -> crate::Result<Vec<String>> {
let mut stream = self
.pool
.get()
.await?
.streaming_search(
&self.mappings.base_dn,
Scope::Subtree,
&self.mappings.filter_expand.build(address),
&self.mappings.attr_email_address,
)
.await?;
let mut emails = Vec::new();
while let Some(entry) = stream.next().await? {
let entry = SearchEntry::construct(entry);
for attr in &self.mappings.attr_email_address {
if let Some(values) = entry.attrs.get(attr) {
for email in values {
if !email.is_empty() {
emails.push(email.to_string());
}
}
}
}
}
Ok(emails)
}
async fn query(&self, query: &str, params: &[&str]) -> crate::Result<bool> {
let mut conn = self.pool.get().await?;
Ok(if !params.is_empty() {
let mut expanded_query = String::with_capacity(query.len() + params.len() * 2);
for (pos, item) in query.split('$').enumerate() {
if pos > 0 {
if let Some(param) = params.get(pos - 1) {
expanded_query.push_str(param);
}
}
expanded_query.push_str(item);
}
conn.streaming_search(
&self.mappings.base_dn,
Scope::Subtree,
&expanded_query,
Vec::<String>::new(),
)
.await
} else {
conn.streaming_search(
&self.mappings.base_dn,
Scope::Subtree,
query,
Vec::<String>::new(),
)
.await
}?
.next()
.await?
.is_some())
}
}
impl LdapDirectory {
async fn find_principal(&self, filter: &str) -> crate::Result<Option<Principal>> {
let (rs, _res) = self
.pool
.get()
.await?
.search(
&self.mappings.base_dn,
Scope::Subtree,
filter,
&self.mappings.attrs_principal,
)
.await?
.success()?;
Ok(rs.into_iter().next().map(|entry| {
self.mappings
.entry_to_principal(SearchEntry::construct(entry))
}))
}
}
impl LdapMappings {
pub fn entry_to_principal(&self, entry: SearchEntry) -> Principal {
let mut principal = Principal {
id: u32::MAX,
..Default::default()
};
for (attr, value) in entry.attrs {
if self.attr_id.contains(&attr) {
if let Ok(id) = value.into_iter().next().unwrap_or_default().parse() {
principal.id = id;
}
} else if self.attr_name.contains(&attr) {
principal.name = value.into_iter().next().unwrap_or_default();
} else if self.attr_secret.contains(&attr) {
principal.secret = value.into_iter().next();
} else if self.attr_description.contains(&attr) {
principal.description = value.into_iter().next();
} else if self.attr_groups.contains(&attr) {
principal.member_of.extend(value);
} else if self.attr_quota.contains(&attr) {
if let Ok(quota) = value.into_iter().next().unwrap_or_default().parse() {
principal.quota = quota;
}
} else if attr.eq_ignore_ascii_case("objectClass") {
if value.contains(&self.obj_user) {
principal.typ = Type::Individual;
} else if value.contains(&self.obj_group) {
principal.typ = Type::Group;
}
}
}
principal
}
}

View file

@ -0,0 +1,69 @@
use bb8::Pool;
use ldap3::LdapConnSettings;
pub mod config;
pub mod lookup;
pub mod pool;
pub struct LdapDirectory {
pool: Pool<LdapConnectionManager>,
mappings: LdapMappings,
}
pub struct LdapMappings {
base_dn: String,
filter_login: LdapFilter,
filter_name: LdapFilter,
filter_email: LdapFilter,
filter_id: LdapFilter,
filter_verify: LdapFilter,
filter_expand: LdapFilter,
obj_user: String,
obj_group: String,
attr_name: Vec<String>,
attr_description: Vec<String>,
attr_secret: Vec<String>,
attr_groups: Vec<String>,
attr_id: Vec<String>,
attr_email_address: Vec<String>,
attr_quota: Vec<String>,
attrs_principal: Vec<String>,
attrs_email: Vec<String>,
}
struct LdapFilter {
filter: Vec<String>,
}
impl LdapFilter {
pub fn build(&self, value: &str) -> String {
self.filter.join(value)
}
}
pub(crate) struct LdapConnectionManager {
address: String,
settings: LdapConnSettings,
bind_dn: Option<Bind>,
}
pub(crate) struct Bind {
dn: String,
password: String,
}
impl LdapConnectionManager {
pub fn new(address: String, settings: LdapConnSettings, bind_dn: Option<Bind>) -> Self {
Self {
address,
settings,
bind_dn,
}
}
}
impl Bind {
pub fn new(dn: String, password: String) -> Self {
Self { dn, password }
}
}

View file

@ -0,0 +1,33 @@
use bb8::ManageConnection;
use ldap3::{exop::WhoAmI, Ldap, LdapConnAsync, LdapError};
use super::LdapConnectionManager;
#[async_trait::async_trait]
impl ManageConnection for LdapConnectionManager {
type Connection = Ldap;
type Error = LdapError;
/// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let (conn, mut ldap) =
LdapConnAsync::with_settings(self.settings.clone(), &self.address).await?;
ldap3::drive!(conn);
if let Some(bind) = &self.bind_dn {
ldap.simple_bind(&bind.dn, &bind.password).await?;
}
Ok(ldap)
}
/// Determines if the connection is still connected to the database.
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
conn.extended(WhoAmI).await.map(|_| ())
}
/// Synchronously determine if the connection is no longer usable, if possible.
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.is_closed()
}
}

167
crates/directory/src/lib.rs Normal file
View file

@ -0,0 +1,167 @@
use bb8::RunError;
use imap::ImapError;
use ldap3::LdapError;
use mail_send::Credentials;
pub mod imap;
pub mod ldap;
pub mod smtp;
pub mod sql;
#[derive(Debug, Default)]
pub struct Principal {
pub id: u32,
pub name: String,
pub secret: Option<String>,
pub typ: Type,
pub description: Option<String>,
pub quota: u32,
pub member_of: Vec<String>,
}
#[derive(Debug, Default, Clone, Copy)]
pub enum Type {
Individual,
Group,
Resource,
Location,
#[default]
Other,
}
pub enum DirectoryError {
Ldap(LdapError),
Sql(sqlx::Error),
Imap(ImapError),
Smtp(mail_send::Error),
TimedOut,
Unsupported,
}
#[async_trait::async_trait]
pub trait Directory {
async fn authenticate(&self, credentials: &Credentials<String>) -> Result<Option<Principal>>;
async fn principal_by_name(&self, name: &str) -> Result<Option<Principal>>;
async fn principal_by_id(&self, id: u32) -> Result<Option<Principal>>;
async fn member_of(&self, principal: &Principal) -> Result<Vec<u32>>;
async fn emails_by_id(&self, id: u32) -> Result<Vec<String>>;
async fn ids_by_email(&self, email: &str) -> Result<Vec<u32>>;
async fn rcpt(&self, address: &str) -> crate::Result<bool>;
async fn vrfy(&self, address: &str) -> Result<Vec<String>>;
async fn expn(&self, address: &str) -> Result<Vec<String>>;
async fn query(&self, query: &str, params: &[&str]) -> Result<bool>;
}
pub type Result<T> = std::result::Result<T, DirectoryError>;
impl From<LdapError> for DirectoryError {
fn from(error: LdapError) -> Self {
DirectoryError::Ldap(error)
}
}
impl From<RunError<LdapError>> for DirectoryError {
fn from(error: RunError<LdapError>) -> Self {
match error {
RunError::User(error) => DirectoryError::Ldap(error),
RunError::TimedOut => DirectoryError::TimedOut,
}
}
}
impl From<RunError<ImapError>> for DirectoryError {
fn from(error: RunError<ImapError>) -> Self {
match error {
RunError::User(error) => DirectoryError::Imap(error),
RunError::TimedOut => DirectoryError::TimedOut,
}
}
}
impl From<RunError<mail_send::Error>> for DirectoryError {
fn from(error: RunError<mail_send::Error>) -> Self {
match error {
RunError::User(error) => DirectoryError::Smtp(error),
RunError::TimedOut => DirectoryError::TimedOut,
}
}
}
impl From<sqlx::Error> for DirectoryError {
fn from(error: sqlx::Error) -> Self {
DirectoryError::Sql(error)
}
}
impl From<ImapError> for DirectoryError {
fn from(error: ImapError) -> Self {
DirectoryError::Imap(error)
}
}
impl From<mail_send::Error> for DirectoryError {
fn from(error: mail_send::Error) -> Self {
DirectoryError::Smtp(error)
}
}
impl DirectoryError {
pub fn unsupported(protocol: &str, method: &str) -> Self {
tracing::warn!(
context = "remote",
event = "error",
protocol = protocol,
method = method,
"Method not supported by directory"
);
DirectoryError::Unsupported
}
}
#[cfg(test)]
mod tests {
use ldap3::{LdapConnAsync, LdapConnSettings, Scope, SearchEntry};
use crate::ldap::{Bind, LdapConnectionManager};
#[tokio::test]
async fn ldap() {
let manager = LdapConnectionManager::new(
"ldap://localhost:3893".to_string(),
LdapConnSettings::new(),
Bind::new(
"cn=serviceuser,ou=svcaccts,dc=example,dc=com".into(),
"mysecret".into(),
)
.into(),
);
let pool = bb8::Pool::builder()
.min_idle(None)
.max_size(10)
.max_lifetime(std::time::Duration::from_secs(30 * 60).into())
.idle_timeout(std::time::Duration::from_secs(10 * 60).into())
.connection_timeout(std::time::Duration::from_secs(30))
.test_on_check_out(true)
.build(manager)
.await
.unwrap();
let mut ldap = pool.get().await.unwrap();
let (rs, _res) = ldap
.search(
"dc=example,dc=com",
Scope::Subtree,
"(&(objectClass=posixAccount)(cn=johndoe))",
vec!["cocomiel", "cn", "uidNumber"], //Vec::<String>::new(),
)
.await
.unwrap()
.success()
.unwrap();
for entry in rs {
println!("{:#?}", SearchEntry::construct(entry));
}
ldap.unbind().await.unwrap()
}
}

View file

@ -0,0 +1,119 @@
use mail_send::{smtp::AssertReply, Credentials};
use smtp_proto::Severity;
use crate::{Directory, DirectoryError, Principal};
use super::{SmtpClient, SmtpDirectory};
#[async_trait::async_trait]
impl Directory for SmtpDirectory {
async fn authenticate(
&self,
credentials: &Credentials<String>,
) -> crate::Result<Option<Principal>> {
self.pool.get().await?.authenticate(credentials).await
}
async fn principal_by_name(&self, _name: &str) -> crate::Result<Option<Principal>> {
Err(DirectoryError::unsupported("smtp", "principal_by_name"))
}
async fn principal_by_id(&self, _id: u32) -> crate::Result<Option<Principal>> {
Err(DirectoryError::unsupported("smtp", "principal_by_id"))
}
async fn member_of(&self, _principal: &Principal) -> crate::Result<Vec<u32>> {
Err(DirectoryError::unsupported("smtp", "member_of"))
}
async fn emails_by_id(&self, _id: u32) -> crate::Result<Vec<String>> {
Err(DirectoryError::unsupported("smtp", "emails_by_id"))
}
async fn ids_by_email(&self, _address: &str) -> crate::Result<Vec<u32>> {
Err(DirectoryError::unsupported("smtp", "ids_by_email"))
}
async fn rcpt(&self, address: &str) -> crate::Result<bool> {
let mut conn = self.pool.get().await?;
if !conn.sent_mail_from {
conn.client
.cmd(b"MAIL FROM:<>\r\n")
.await?
.assert_positive_completion()?;
conn.sent_mail_from = true;
}
let reply = conn
.client
.cmd(format!("RCPT TO:<{address}>\r\n").as_bytes())
.await?;
match reply.severity() {
Severity::PositiveCompletion => {
conn.num_rcpts += 1;
if conn.num_rcpts >= conn.max_rcpt {
let _ = conn.client.rset().await;
conn.num_rcpts = 0;
conn.sent_mail_from = false;
}
Ok(true)
}
Severity::PermanentNegativeCompletion => Ok(false),
_ => Err(mail_send::Error::UnexpectedReply(reply).into()),
}
}
async fn vrfy(&self, address: &str) -> crate::Result<Vec<String>> {
self.pool
.get()
.await?
.expand(&format!("VRFY {address}\r\n"))
.await
}
async fn expn(&self, address: &str) -> crate::Result<Vec<String>> {
self.pool
.get()
.await?
.expand(&format!("EXPN {address}\r\n"))
.await
}
async fn query(&self, _query: &str, _params: &[&str]) -> crate::Result<bool> {
Err(DirectoryError::unsupported("smtp", "query"))
}
}
impl SmtpClient {
async fn authenticate(
&mut self,
credentials: &Credentials<String>,
) -> crate::Result<Option<Principal>> {
match self
.client
.authenticate(credentials, &self.capabilities)
.await
{
Ok(_) => Ok(Some(Principal::default())),
Err(err) => match &err {
mail_send::Error::AuthenticationFailed(err) if err.code() == 535 => {
self.num_auth_failures += 1;
Ok(None)
}
_ => Err(err.into()),
},
}
}
async fn expand(&mut self, command: &str) -> crate::Result<Vec<String>> {
let reply = self.client.cmd(command.as_bytes()).await?;
match reply.code() {
250 | 251 => Ok(reply
.message()
.split('\n')
.map(|p| p.to_string())
.collect::<Vec<String>>()),
550 | 551 | 553 | 500 | 502 => Err(DirectoryError::Unsupported),
_ => Err(mail_send::Error::UnexpectedReply(reply).into()),
}
}
}

View file

@ -0,0 +1,28 @@
pub mod lookup;
pub mod pool;
use bb8::Pool;
use mail_send::SmtpClientBuilder;
use smtp_proto::EhloResponse;
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
pub struct SmtpDirectory {
pool: Pool<SmtpConnectionManager>,
}
pub struct SmtpConnectionManager {
builder: SmtpClientBuilder<String>,
max_rcpt: usize,
max_auth_errors: usize,
}
pub struct SmtpClient {
client: mail_send::SmtpClient<TlsStream<TcpStream>>,
capabilities: EhloResponse<String>,
max_rcpt: usize,
max_auth_errors: usize,
num_rcpts: usize,
num_auth_failures: usize,
sent_mail_from: bool,
}

View file

@ -0,0 +1,41 @@
use bb8::ManageConnection;
use mail_send::{smtp::AssertReply, Error};
use super::{SmtpClient, SmtpConnectionManager};
#[async_trait::async_trait]
impl ManageConnection for SmtpConnectionManager {
type Connection = SmtpClient;
type Error = Error;
/// Attempts to create a new connection.
async fn connect(&self) -> Result<Self::Connection, Self::Error> {
let mut client = self.builder.connect().await?;
let capabilities = client
.capabilities(&self.builder.local_host, self.builder.is_lmtp)
.await?;
Ok(SmtpClient {
capabilities,
client,
max_auth_errors: self.max_auth_errors,
max_rcpt: self.max_rcpt,
num_rcpts: 0,
num_auth_failures: 0,
sent_mail_from: false,
})
}
/// Determines if the connection is still connected to the database.
async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
conn.client
.cmd(b"NOOP\r\n")
.await?
.assert_positive_completion()
}
/// Synchronously determine if the connection is no longer usable, if possible.
fn has_broken(&self, conn: &mut Self::Connection) -> bool {
conn.num_auth_failures >= conn.max_auth_errors
}
}

View file

@ -0,0 +1,157 @@
use mail_send::Credentials;
use sqlx::{any::AnyRow, Column, Row};
use crate::{Directory, Principal, Type};
use super::{SqlDirectory, SqlMappings};
#[async_trait::async_trait]
impl Directory for SqlDirectory {
async fn authenticate(
&self,
credentials: &Credentials<String>,
) -> crate::Result<Option<Principal>> {
let (username, secret) = match credentials {
Credentials::Plain { username, secret } => (username, secret),
Credentials::OAuthBearer { token } => (token, token),
Credentials::XOauth2 { username, secret } => (username, secret),
};
if let Some(row) = sqlx::query(&self.mappings.query_login)
.bind(username)
.fetch_optional(&self.pool)
.await?
{
self.mappings.row_to_principal(row).map(|p| {
if p.secret.as_ref().map_or(false, |s| s == secret) {
Some(p)
} else {
None
}
})
} else {
Ok(None)
}
}
async fn principal_by_name(&self, name: &str) -> crate::Result<Option<Principal>> {
if let Some(row) = sqlx::query(&self.mappings.query_name)
.bind(name)
.fetch_optional(&self.pool)
.await?
{
self.mappings.row_to_principal(row).map(Some)
} else {
Ok(None)
}
}
async fn principal_by_id(&self, id: u32) -> crate::Result<Option<Principal>> {
if let Some(row) = sqlx::query(&self.mappings.query_id)
.bind(id as i64)
.fetch_optional(&self.pool)
.await?
{
self.mappings.row_to_principal(row).map(Some)
} else {
Ok(None)
}
}
async fn member_of(&self, principal: &Principal) -> crate::Result<Vec<u32>> {
sqlx::query_scalar::<_, i64>(&self.mappings.query_members)
.bind(principal.id as i64)
.fetch_all(&self.pool)
.await
.map(|ids| ids.into_iter().map(|id| id as u32).collect())
.map_err(Into::into)
}
async fn emails_by_id(&self, id: u32) -> crate::Result<Vec<String>> {
sqlx::query_scalar::<_, String>(&self.mappings.query_emails)
.bind(id as i64)
.fetch_all(&self.pool)
.await
.map_err(Into::into)
}
async fn ids_by_email(&self, address: &str) -> crate::Result<Vec<u32>> {
sqlx::query_scalar::<_, i64>(&self.mappings.query_recipients)
.bind(address)
.fetch_all(&self.pool)
.await
.map(|ids| ids.into_iter().map(|id| id as u32).collect())
.map_err(Into::into)
}
async fn rcpt(&self, address: &str) -> crate::Result<bool> {
sqlx::query_scalar::<_, i64>(&self.mappings.query_recipients)
.bind(address)
.fetch_optional(&self.pool)
.await
.map(|id| id.is_some())
.map_err(Into::into)
}
async fn vrfy(&self, address: &str) -> crate::Result<Vec<String>> {
sqlx::query_scalar::<_, String>(&self.mappings.query_verify)
.bind(address)
.fetch_all(&self.pool)
.await
.map_err(Into::into)
}
async fn expn(&self, address: &str) -> crate::Result<Vec<String>> {
sqlx::query_scalar::<_, String>(&self.mappings.query_expand)
.bind(address)
.fetch_all(&self.pool)
.await
.map_err(Into::into)
}
async fn query(&self, query: &str, params: &[&str]) -> crate::Result<bool> {
let mut q = sqlx::query(query);
for param in params {
q = q.bind(param);
}
q.fetch_optional(&self.pool)
.await
.map(|r| r.is_some())
.map_err(Into::into)
}
}
impl SqlMappings {
pub fn row_to_principal(&self, row: AnyRow) -> crate::Result<Principal> {
let mut principal = Principal {
id: u32::MAX,
..Default::default()
};
for col in row.columns() {
let name = col.name();
let idx = col.ordinal();
if name.eq_ignore_ascii_case(&self.column_id) {
principal.id = row.try_get::<i64, _>(idx)? as u32;
} else if name.eq_ignore_ascii_case(&self.column_name) {
principal.name = row.try_get::<Option<String>, _>(idx)?.unwrap_or_default();
} else if name.eq_ignore_ascii_case(&self.column_secret) {
principal.secret = row.try_get::<Option<String>, _>(idx)?;
} else if name.eq_ignore_ascii_case(&self.column_type) {
if let Some(typ) = row.try_get::<Option<String>, _>(idx)? {
match typ.as_str() {
"individual" | "person" | "user" => principal.typ = Type::Individual,
"group" => principal.typ = Type::Group,
_ => (),
}
}
} else if name.eq_ignore_ascii_case(&self.column_description) {
principal.description = row.try_get::<Option<String>, _>(idx)?;
} else if name.eq_ignore_ascii_case(&self.column_quota) {
principal.quota = row.try_get::<i64, _>(idx)? as u32;
}
}
Ok(principal)
}
}

View file

@ -0,0 +1,25 @@
use sqlx::{Any, Pool};
pub mod lookup;
pub struct SqlDirectory {
pool: Pool<Any>,
mappings: SqlMappings,
}
pub(crate) struct SqlMappings {
query_login: String,
query_name: String,
query_id: String,
query_members: String,
query_recipients: String,
query_emails: String,
query_verify: String,
query_expand: String,
column_name: String,
column_description: String,
column_secret: String,
column_id: String,
column_quota: String,
column_type: String,
}

View file

@ -7,8 +7,9 @@ resolver = "2"
[dependencies]
store = { path = "../store" }
jmap_proto = { path = "../jmap-proto" }
utils = { path = "../utils" }
smtp = { path = "../smtp" }
utils = { path = "../utils" }
directory = { path = "../directory" }
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-parser = { git = "https://github.com/stalwartlabs/mail-parser", features = ["full_encoding", "serde_support", "ludicrous_mode"] }
mail-builder = { git = "https://github.com/stalwartlabs/mail-builder", features = ["ludicrous_mode"] }