From 5fda55064296f1d4e09169f9c886fd2e6d1ae2a6 Mon Sep 17 00:00:00 2001 From: Mauro D Date: Wed, 21 Jun 2023 17:26:28 +0000 Subject: [PATCH] ImapSessionManager implementation --- Cargo.lock | 12 + crates/imap/Cargo.toml | 10 + crates/imap/src/core/client.rs | 350 ++++++++++++++++++++++++++++ crates/imap/src/core/mod.rs | 120 ++++++++++ crates/imap/src/core/session.rs | 214 +++++++++++++++++ crates/imap/src/core/writer.rs | 143 ++++++++++++ crates/imap/src/lib.rs | 43 ++++ crates/smtp/src/inbound/spawn.rs | 30 +-- crates/utils/src/listener/listen.rs | 39 +++- 9 files changed, 930 insertions(+), 31 deletions(-) create mode 100644 crates/imap/src/core/client.rs create mode 100644 crates/imap/src/core/mod.rs create mode 100644 crates/imap/src/core/session.rs create mode 100644 crates/imap/src/core/writer.rs diff --git a/Cargo.lock b/Cargo.lock index d6fa4c1c..1306ca89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1707,6 +1707,18 @@ dependencies = [ [[package]] name = "imap" version = "0.1.0" +dependencies = [ + "directory", + "imap_proto", + "jmap", + "parking_lot", + "rustls 0.21.1", + "rustls-pemfile", + "tokio", + "tokio-rustls 0.24.0", + "tracing", + "utils", +] [[package]] name = "imap_proto" diff --git a/crates/imap/Cargo.toml b/crates/imap/Cargo.toml index 1bb2d4bc..fe6a03d1 100644 --- a/crates/imap/Cargo.toml +++ b/crates/imap/Cargo.toml @@ -5,3 +5,13 @@ edition = "2021" resolver = "2" [dependencies] +imap_proto = { path = "../imap-proto" } +jmap = { path = "../jmap" } +directory = { path = "../directory" } +utils = { path = "../utils" } +rustls = "0.21.0" +rustls-pemfile = "1.0" +tokio = { version = "1.23", features = ["full"] } +tokio-rustls = { version = "0.24.0"} +parking_lot = "0.12" +tracing = "0.1" diff --git a/crates/imap/src/core/client.rs b/crates/imap/src/core/client.rs new file mode 100644 index 00000000..6c83d69c --- /dev/null +++ b/crates/imap/src/core/client.rs @@ -0,0 +1,350 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{iter::Peekable, sync::Arc, vec::IntoIter}; + +use imap_proto::{ + receiver::{self, Request}, + Command, StatusResponse, +}; +use tokio::io::AsyncRead; + +use super::{SelectedMailbox, Session, SessionData, State}; + +impl Session { + pub async fn ingest(&mut self, bytes: &[u8]) -> Result { + /*let tmp = "dd"; + for line in String::from_utf8_lossy(bytes).split("\r\n") { + println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]); + }*/ + + let mut bytes = bytes.iter(); + let mut requests = Vec::with_capacity(2); + let mut needs_literal = None; + + loop { + match self.receiver.parse(&mut bytes) { + Ok(request) => match request.is_allowed(&self.state, self.is_tls) { + Ok(request) => { + requests.push(request); + } + Err(response) => { + self.write_bytes(response.into_bytes()).await?; + } + }, + Err(receiver::Error::NeedsMoreData) => { + break; + } + Err(receiver::Error::NeedsLiteral { size }) => { + needs_literal = size.into(); + break; + } + Err(receiver::Error::Error { response }) => { + self.write_bytes(response.into_bytes()).await?; + break; + } + } + } + + let mut requests = requests.into_iter().peekable(); + while let Some(request) = requests.next() { + match request.command { + Command::List | Command::Lsub => { + //self.handle_list(request).await?; + } + Command::Select | Command::Examine => { + //self.handle_select(request).await?; + } + Command::Create => { + //self.handle_create(group_requests(&mut requests, vec![request])).await?; + } + Command::Delete => { + //self.handle_delete(group_requests(&mut requests, vec![request])).await?; + } + Command::Rename => { + //self.handle_rename(request).await?; + } + Command::Status => { + //self.handle_status(request).await?; + } + Command::Append => { + //self.handle_append(request).await?; + } + Command::Close => { + //self.handle_close(request).await?; + } + Command::Unselect => { + //self.handle_unselect(request).await?; + } + Command::Expunge(is_uid) => { + //self.handle_expunge(request, is_uid).await?; + } + Command::Search(is_uid) => { + //self.handle_search(request, false, is_uid).await?; + } + Command::Fetch(is_uid) => { + //self.handle_fetch(request, is_uid).await?; + } + Command::Store(is_uid) => { + //self.handle_store(request, is_uid).await?; + } + Command::Copy(is_uid) => { + //self.handle_copy_move(request, false, is_uid).await?; + } + Command::Move(is_uid) => { + //self.handle_copy_move(request, true, is_uid).await?; + } + Command::Sort(is_uid) => { + //self.handle_search(request, true, is_uid).await?; + } + Command::Thread(is_uid) => { + //self.handle_thread(request, is_uid).await?; + } + Command::Idle => { + //self.handle_idle(request).await?; + } + Command::Subscribe => { + //self.handle_subscribe(request, true).await?; + } + Command::Unsubscribe => { + //self.handle_subscribe(request, false).await?; + } + Command::Namespace => { + //self.handle_namespace(request).await?; + } + Command::Authenticate => { + //self.handle_authenticate(request).await?; + } + Command::Login => { + //self.handle_login(request).await?; + } + Command::Capability => { + //self.handle_capability(request).await?; + } + Command::Enable => { + //self.handle_enable(request).await?; + } + Command::StartTls => { + //return self.handle_starttls(request).await; + } + Command::Noop => { + //self.handle_noop(request, false).await?; + } + Command::Check => { + //self.handle_noop(request, true).await?; + } + Command::Logout => { + //self.handle_logout(request).await?; + } + Command::SetAcl => { + //self.handle_set_acl(request).await?; + } + Command::DeleteAcl => { + //self.handle_delete_acl(request).await?; + } + Command::GetAcl => { + //self.handle_get_acl(request).await?; + } + Command::ListRights => { + //self.handle_list_rights(request).await?; + } + Command::MyRights => { + //self.handle_my_rights(request).await?; + } + Command::Unauthenticate => { + //self.handle_unauthenticate(request).await?; + } + Command::Id => { + //self.handle_id(request).await?; + } + } + } + + if let Some(needs_literal) = needs_literal { + self.write_bytes(format!("+ Ready for {} bytes.\r\n", needs_literal).into_bytes()) + .await?; + } + + Ok(false) + } +} + +pub fn group_requests( + requests: &mut Peekable>>, + mut grouped_requests: Vec>, +) -> Vec> { + let last_command = grouped_requests.last().unwrap().command; + loop { + match requests.peek() { + Some(request) if request.command == last_command => { + grouped_requests.push(requests.next().unwrap()); + } + _ => break, + } + } + grouped_requests +} + +trait IsAllowed: Sized { + fn is_allowed(self, state: &State, is_tls: bool) -> Result; +} + +impl IsAllowed for Request { + fn is_allowed(self, state: &State, is_tls: bool) -> Result { + match &self.command { + Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(self), + Command::StartTls => { + if !is_tls { + Ok(self) + } else { + Err(StatusResponse::no("Already in TLS mode.").with_tag(self.tag)) + } + } + Command::Authenticate => { + if let State::NotAuthenticated { .. } = state { + Ok(self) + } else { + Err(StatusResponse::no("Already authenticated.").with_tag(self.tag)) + } + } + Command::Login => { + if let State::NotAuthenticated { .. } = state { + if is_tls { + Ok(self) + } else { + Err( + StatusResponse::no("LOGIN is disabled on the clear-text port.") + .with_tag(self.tag), + ) + } + } else { + Err(StatusResponse::no("Already authenticated.").with_tag(self.tag)) + } + } + Command::Enable + | Command::Select + | Command::Examine + | Command::Create + | Command::Delete + | Command::Rename + | Command::Subscribe + | Command::Unsubscribe + | Command::List + | Command::Lsub + | Command::Namespace + | Command::Status + | Command::Append + | Command::Idle + | Command::SetAcl + | Command::DeleteAcl + | Command::GetAcl + | Command::ListRights + | Command::MyRights + | Command::Unauthenticate => { + if let State::Authenticated { .. } | State::Selected { .. } = state { + Ok(self) + } else { + Err(StatusResponse::no("Not authenticated.").with_tag(self.tag)) + } + } + Command::Close + | Command::Unselect + | Command::Expunge(_) + | Command::Search(_) + | Command::Fetch(_) + | Command::Store(_) + | Command::Copy(_) + | Command::Move(_) + | Command::Check + | Command::Sort(_) + | Command::Thread(_) => match state { + State::Selected { mailbox, .. } => { + if mailbox.is_select + || !matches!( + self.command, + Command::Store(_) | Command::Expunge(_) | Command::Move(_), + ) + { + Ok(self) + } else { + Err(StatusResponse::no("Not permitted in EXAMINE state.") + .with_tag(self.tag)) + } + } + State::Authenticated { .. } => { + Err(StatusResponse::bad("No mailbox is selected.").with_tag(self.tag)) + } + State::NotAuthenticated { .. } => { + Err(StatusResponse::no("Not authenticated.").with_tag(self.tag)) + } + }, + } + } +} + +impl State { + pub fn auth_failures(&self) -> u32 { + match self { + State::NotAuthenticated { auth_failures, .. } => *auth_failures, + _ => unreachable!(), + } + } + + pub fn session_data(&self) -> Arc { + match self { + State::Authenticated { data } => data.clone(), + State::Selected { data, .. } => data.clone(), + _ => unreachable!(), + } + } + + pub fn mailbox_data(&self) -> (Arc, Arc) { + match self { + State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone()), + _ => unreachable!(), + } + } + + pub fn session_mailbox_data(&self) -> (Arc, Option>) { + match self { + State::Authenticated { data } => (data.clone(), None), + State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone().into()), + _ => unreachable!(), + } + } + + pub fn select_data(&self) -> (Arc, Arc) { + match self { + State::Selected { data, mailbox } => (data.clone(), mailbox.clone()), + _ => unreachable!(), + } + } + + pub fn is_authenticated(&self) -> bool { + matches!(self, State::Authenticated { .. } | State::Selected { .. }) + } + + pub fn is_mailbox_selected(&self) -> bool { + matches!(self, State::Selected { .. }) + } +} diff --git a/crates/imap/src/core/mod.rs b/crates/imap/src/core/mod.rs new file mode 100644 index 00000000..0382fa79 --- /dev/null +++ b/crates/imap/src/core/mod.rs @@ -0,0 +1,120 @@ +use std::{ + net::{IpAddr, SocketAddr}, + sync::Arc, + time::Duration, +}; + +use imap_proto::{protocol::ProtocolVersion, receiver::Receiver, Command}; +use jmap::{ + auth::{rate_limit::RemoteAddress, AccessToken}, + JMAP, +}; +use tokio::{ + io::{AsyncRead, ReadHalf}, + sync::{mpsc, watch}, +}; +use utils::listener::{limiter::InFlight, ServerInstance}; + +pub mod client; +pub mod session; +pub mod writer; + +#[derive(Clone)] +pub struct ImapSessionManager { + pub jmap: Arc, + pub imap: Arc, +} + +impl ImapSessionManager { + pub fn new(jmap: Arc, imap: Arc) -> Self { + Self { jmap, imap } + } +} + +pub struct IMAP { + pub max_request_size: usize, + pub name_shared: String, + pub name_all: String, + + pub timeout_auth: Duration, + pub timeout_unauth: Duration, + + pub greeting_plain: Vec, + pub greeting_tls: Vec, +} + +pub struct Session { + pub jmap: Arc, + pub imap: Arc, + pub instance: Arc, + pub receiver: Receiver, + pub version: ProtocolVersion, + pub state: State, + pub is_tls: bool, + pub is_condstore: bool, + pub is_qresync: bool, + pub writer: mpsc::Sender, + pub stream_rx: ReadHalf, + pub in_flight: Vec, + pub span: tracing::Span, +} + +pub struct SessionData { + pub core: Arc, + pub writer: mpsc::Sender, + pub access_token: Arc, +} + +pub struct SelectedMailbox { + pub id: MailboxId, + pub state: parking_lot::Mutex, + pub saved_search: parking_lot::Mutex, + pub is_select: bool, + pub is_condstore: bool, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct MailboxId { + pub account_id: u32, + pub mailbox_id: Option, +} + +#[derive(Debug)] +pub struct MailboxData { + pub uid_next: u32, + pub uid_validity: u32, + pub jmap_ids: Vec, + pub imap_uids: Vec, + pub total_messages: usize, + pub last_state: u32, +} + +pub enum SavedSearch { + InFlight { + rx: watch::Receiver>>, + }, + Results { + items: Arc>, + }, + None, +} + +#[derive(Debug, Clone, Copy, Default)] +pub struct ImapId { + pub uid: u32, + pub seqnum: u32, +} + +pub enum State { + NotAuthenticated { + remote_addr: RemoteAddress, + auth_failures: u32, + }, + Authenticated { + data: Arc, + }, + Selected { + data: Arc, + mailbox: Arc, + }, +} diff --git a/crates/imap/src/core/session.rs b/crates/imap/src/core/session.rs new file mode 100644 index 00000000..4ae87d58 --- /dev/null +++ b/crates/imap/src/core/session.rs @@ -0,0 +1,214 @@ +use imap_proto::{protocol::ProtocolVersion, receiver::Receiver}; +use jmap::auth::rate_limit::RemoteAddress; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + sync::oneshot, +}; +use tokio_rustls::server::TlsStream; +use utils::listener::{SessionData, SessionManager}; + +use super::{writer, ImapSessionManager, Session, State}; + +impl SessionManager for ImapSessionManager { + fn spawn(&self, session: SessionData) { + let manager = self.clone(); + + tokio::spawn(async move { + if session.instance.is_tls_implicit { + if let Ok(session) = Session::>::new(session, manager).await { + session.handle_conn().await; + } + } else if let Ok(session) = Session::::new(session, manager).await { + session.handle_conn().await; + } + }); + } + + fn shutdown(&self) { + // No-op + } +} + +impl Session { + pub async fn handle_conn_(&mut self) -> bool { + let mut buf = vec![0; 8192]; + let mut shutdown_rx = self.instance.shutdown_rx.clone(); + + loop { + tokio::select! { + result = tokio::time::timeout( + if !matches!(self.state, State::NotAuthenticated {..}) { + self.imap.timeout_auth + } else { + self.imap.timeout_unauth + }, + self.stream_rx.read(&mut buf)) => { + match result { + Ok(Ok(bytes_read)) => { + if bytes_read > 0 { + match self.ingest(&buf[..bytes_read]).await { + Ok(false) => (), + Ok(true) => { + return true; + } + Err(_) => { + tracing::debug!(parent: &self.span, event = "disconnect", "Disconnecting client."); + break; + } + } + } else { + tracing::debug!(parent: &self.span, event = "close", "IMAP connection closed by client."); + break; + } + }, + Ok(Err(err)) => { + tracing::debug!(parent: &self.span, event = "error", reason = %err, "IMAP connection error."); + break; + }, + Err(_) => { + self.write_bytes(&b"* BYE Connection timed out.\r\n"[..]).await.ok(); + tracing::debug!(parent: &self.span, "IMAP connection timed out."); + break; + } + } + }, + _ = shutdown_rx.changed() => { + self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok(); + tracing::debug!(parent: &self.span, event = "shutdown", "IMAP server shutting down."); + break; + } + }; + } + + false + } +} + +impl Session { + pub async fn new( + mut session: utils::listener::SessionData, + manager: ImapSessionManager, + ) -> Result, ()> { + // Write plain text greeting + if let Err(err) = session.stream.write_all(&manager.imap.greeting_plain).await { + tracing::debug!(parent: &session.span, event = "error", reason = %err, "Failed to write greeting."); + return Err(()); + } + + // Split stream into read and write halves + let (stream_rx, stream_tx) = tokio::io::split(session.stream); + + Ok(Session { + receiver: Receiver::with_max_request_size(manager.imap.max_request_size), + version: ProtocolVersion::Rev1, + state: State::NotAuthenticated { + auth_failures: 0, + remote_addr: RemoteAddress::IpAddress(session.remote_ip), + }, + writer: writer::spawn_writer(writer::Event::Stream(stream_tx)), + is_tls: false, + is_condstore: false, + is_qresync: false, + imap: manager.imap, + jmap: manager.jmap, + instance: session.instance, + span: session.span, + in_flight: vec![session.in_flight], + stream_rx, + }) + } + + pub async fn handle_conn(mut self) { + if self.handle_conn_().await && self.instance.tls_acceptor.is_some() { + if let Ok(session) = self.into_tls().await { + session.handle_conn().await; + } + } + } + + pub async fn into_tls(self) -> Result>, ()> { + // Recover WriteHalf from writer + let (tx, rx) = oneshot::channel(); + if let Err(err) = self.writer.send(writer::Event::Upgrade(tx)).await { + tracing::debug!("Failed to write to channel: {}", err); + return Err(()); + } + let stream = if let Ok(stream_tx) = rx.await { + self.stream_rx.unsplit(stream_tx) + } else { + tracing::debug!("Failed to read from channel"); + return Err(()); + }; + + // Upgrade to TLS + let (stream_rx, stream_tx) = + tokio::io::split(self.instance.tls_accept(stream, &self.span).await?); + if let Err(err) = self.writer.send(writer::Event::StreamTls(stream_tx)).await { + tracing::debug!("Failed to send stream: {}", err); + return Err(()); + } + + Ok(Session { + jmap: self.jmap, + imap: self.imap, + instance: self.instance, + receiver: self.receiver, + version: self.version, + state: self.state, + is_tls: true, + is_condstore: self.is_condstore, + is_qresync: self.is_qresync, + writer: self.writer, + span: self.span, + in_flight: self.in_flight, + stream_rx, + }) + } +} + +impl Session> { + pub async fn new( + session: utils::listener::SessionData, + manager: ImapSessionManager, + ) -> Result>, ()> { + // Upgrade to TLS + let mut stream = session + .instance + .tls_accept(session.stream, &session.span) + .await?; + + // Write TLS greeting + let span = session.span; + if let Err(err) = stream.write_all(&manager.imap.greeting_tls).await { + tracing::debug!(parent: &span, event = "error", reason = %err, "Failed to write greeting."); + return Err(()); + } + + // Spit stream into read and write halves + let (stream_rx, stream_tx) = tokio::io::split(stream); + + Ok(Session { + receiver: Receiver::with_max_request_size(manager.imap.max_request_size), + version: ProtocolVersion::Rev1, + state: State::NotAuthenticated { + auth_failures: 0, + remote_addr: RemoteAddress::IpAddress(session.remote_ip), + }, + writer: writer::spawn_writer(writer::Event::StreamTls(stream_tx)), + is_tls: true, + is_condstore: false, + is_qresync: false, + imap: manager.imap, + jmap: manager.jmap, + instance: session.instance, + span, + in_flight: vec![session.in_flight], + stream_rx, + }) + } + + pub async fn handle_conn(mut self) { + self.handle_conn_().await; + } +} diff --git a/crates/imap/src/core/writer.rs b/crates/imap/src/core/writer.rs new file mode 100644 index 00000000..65200743 --- /dev/null +++ b/crates/imap/src/core/writer.rs @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2020-2022, Stalwart Labs Ltd. + * + * This file is part of the Stalwart IMAP Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::borrow::Cow; + +use tokio::{ + io::{AsyncRead, AsyncWriteExt, WriteHalf}, + net::TcpStream, + sync::{mpsc, oneshot}, +}; +use tokio_rustls::server::TlsStream; +use tracing::debug; + +use super::{Session, SessionData}; + +const IPC_CHANNEL_BUFFER: usize = 128; + +pub enum Event { + Stream(WriteHalf), + StreamTls(WriteHalf>), + Bytes(Cow<'static, [u8]>), + Upgrade(oneshot::Sender>), +} + +pub fn spawn_writer(mut stream: Event) -> mpsc::Sender { + let (tx, mut rx) = mpsc::channel::(IPC_CHANNEL_BUFFER); + tokio::spawn(async move { + 'outer: loop { + match stream { + Event::Stream(mut stream_tx) => { + while let Some(event) = rx.recv().await { + match event { + Event::Bytes(bytes) => { + /*let tmp = "dd"; + println!( + "<- {:?}", + String::from_utf8_lossy( + &bytes[..std::cmp::min(bytes.len(), 100)] + ) + );*/ + + if let Err(err) = stream_tx.write_all(bytes.as_ref()).await { + debug!("Failed to write to stream: {}", err); + break 'outer; + } + } + Event::Upgrade(channel) => { + if channel.send(stream_tx).is_err() { + debug!("Failed to send stream."); + break 'outer; + } + if let Some(stream_) = rx.recv().await { + stream = stream_; + continue 'outer; + } else { + break 'outer; + } + } + _ => { + stream = event; + continue 'outer; + } + } + } + break 'outer; + } + Event::StreamTls(mut stream_tx) => { + while let Some(event) = rx.recv().await { + match event { + Event::Bytes(bytes) => { + if let Err(err) = stream_tx.write_all(bytes.as_ref()).await { + debug!("Failed to write to stream: {}", err); + break 'outer; + } + } + _ => { + stream = event; + continue 'outer; + } + } + } + break 'outer; + } + _ => unreachable!(), + } + } + }); + tx +} + +impl Session { + pub async fn write_bytes(&self, bytes: impl Into>) -> Result<(), ()> { + /*let tmp = "dd"; + println!( + "-> {:?}", + String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)]) + );*/ + + if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await { + debug!("Failed to send bytes: {}", err); + Err(()) + } else { + Ok(()) + } + } +} + +impl SessionData { + pub async fn write_bytes(&self, bytes: impl Into>) -> bool { + /*let tmp = "dd"; + println!( + "-> {:?}", + String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)]) + );*/ + + if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await { + debug!("Failed to send bytes: {}", err); + false + } else { + true + } + } +} diff --git a/crates/imap/src/lib.rs b/crates/imap/src/lib.rs index e69de29b..814538e3 100644 --- a/crates/imap/src/lib.rs +++ b/crates/imap/src/lib.rs @@ -0,0 +1,43 @@ +use std::sync::Arc; + +use crate::core::IMAP; + +use directory::DirectoryConfig; +use imap_proto::{protocol::capability::Capability, ResponseCode, StatusResponse}; +use utils::config::Config; + +pub mod core; + +static SERVER_GREETING: &str = concat!( + "Stalwart IMAP4rev2 v", + env!("CARGO_PKG_VERSION"), + " at your service." +); + +impl IMAP { + pub async fn init(config: &Config, directory: &DirectoryConfig) -> Result, String> { + Ok(Arc::new(IMAP { + max_request_size: config.property_or_static("imap.request.max-size", "52428800")?, + name_shared: config + .value("imap.folders.name.shared") + .unwrap_or("Shared Folders") + .to_string(), + name_all: config + .value("imap.folders.name.all") + .unwrap_or("All Mail") + .to_string(), + greeting_plain: StatusResponse::ok(SERVER_GREETING) + .with_code(ResponseCode::Capability { + capabilities: Capability::all_capabilities(false, false), + }) + .into_bytes(), + timeout_auth: config.property_or_static("imap.timeout.authenticated", "30m")?, + timeout_unauth: config.property_or_static("imap.timeout.anonymous", "1m")?, + greeting_tls: StatusResponse::ok(SERVER_GREETING) + .with_code(ResponseCode::Capability { + capabilities: Capability::all_capabilities(false, true), + }) + .into_bytes(), + })) + } +} diff --git a/crates/smtp/src/inbound/spawn.rs b/crates/smtp/src/inbound/spawn.rs index d78c7a64..3a48f26e 100644 --- a/crates/smtp/src/inbound/spawn.rs +++ b/crates/smtp/src/inbound/spawn.rs @@ -85,35 +85,7 @@ impl Session { pub async fn into_tls(self) -> Result>, ()> { let span = self.span; Ok(Session { - stream: match self - .instance - .tls_acceptor - .as_ref() - .unwrap() - .accept(self.stream) - .await - { - Ok(stream) => { - tracing::info!( - parent: &span, - context = "tls", - event = "handshake", - version = ?stream.get_ref().1.protocol_version().unwrap_or(rustls::ProtocolVersion::TLSv1_3), - cipher = ?stream.get_ref().1.negotiated_cipher_suite().unwrap_or(rustls::cipher_suite::TLS13_AES_128_GCM_SHA256), - ); - stream - } - Err(err) => { - tracing::debug!( - parent: &span, - context = "tls", - event = "error", - "Failed to accept TLS connection: {}", - err - ); - return Err(()); - } - }, + stream: self.instance.tls_accept(self.stream, &span).await?, state: self.state, data: self.data, instance: self.instance, diff --git a/crates/utils/src/listener/listen.rs b/crates/utils/src/listener/listen.rs index b52d8c91..c93f26f8 100644 --- a/crates/utils/src/listener/listen.rs +++ b/crates/utils/src/listener/listen.rs @@ -23,8 +23,12 @@ use std::sync::Arc; -use tokio::{net::TcpListener, sync::watch}; -use tokio_rustls::TlsAcceptor; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::watch, +}; +use tokio_rustls::{server::TlsStream, TlsAcceptor}; +use tracing::Span; use crate::{ config::{Config, Listener, Server, ServerProtocol, Servers}, @@ -186,3 +190,34 @@ impl Listener { listener } } + +impl ServerInstance { + pub async fn tls_accept( + &self, + stream: TcpStream, + span: &Span, + ) -> Result, ()> { + match self.tls_acceptor.as_ref().unwrap().accept(stream).await { + Ok(stream) => { + tracing::info!( + parent: span, + context = "tls", + event = "handshake", + version = ?stream.get_ref().1.protocol_version().unwrap_or(rustls::ProtocolVersion::TLSv1_3), + cipher = ?stream.get_ref().1.negotiated_cipher_suite().unwrap_or(rustls::cipher_suite::TLS13_AES_128_GCM_SHA256), + ); + Ok(stream) + } + Err(err) => { + tracing::debug!( + parent: span, + context = "tls", + event = "error", + "Failed to accept TLS connection: {}", + err + ); + Err(()) + } + } + } +}