mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2024-09-20 07:16:18 +08:00
ImapSessionManager implementation
This commit is contained in:
parent
533e782baa
commit
5fda550642
12
Cargo.lock
generated
12
Cargo.lock
generated
|
@ -1707,6 +1707,18 @@ dependencies = [
|
|||
[[package]]
|
||||
name = "imap"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"directory",
|
||||
"imap_proto",
|
||||
"jmap",
|
||||
"parking_lot",
|
||||
"rustls 0.21.1",
|
||||
"rustls-pemfile",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tracing",
|
||||
"utils",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "imap_proto"
|
||||
|
|
|
@ -5,3 +5,13 @@ edition = "2021"
|
|||
resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
imap_proto = { path = "../imap-proto" }
|
||||
jmap = { path = "../jmap" }
|
||||
directory = { path = "../directory" }
|
||||
utils = { path = "../utils" }
|
||||
rustls = "0.21.0"
|
||||
rustls-pemfile = "1.0"
|
||||
tokio = { version = "1.23", features = ["full"] }
|
||||
tokio-rustls = { version = "0.24.0"}
|
||||
parking_lot = "0.12"
|
||||
tracing = "0.1"
|
||||
|
|
350
crates/imap/src/core/client.rs
Normal file
350
crates/imap/src/core/client.rs
Normal file
|
@ -0,0 +1,350 @@
|
|||
/*
|
||||
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart IMAP Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use std::{iter::Peekable, sync::Arc, vec::IntoIter};
|
||||
|
||||
use imap_proto::{
|
||||
receiver::{self, Request},
|
||||
Command, StatusResponse,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
use super::{SelectedMailbox, Session, SessionData, State};
|
||||
|
||||
impl<T: AsyncRead> Session<T> {
|
||||
pub async fn ingest(&mut self, bytes: &[u8]) -> Result<bool, ()> {
|
||||
/*let tmp = "dd";
|
||||
for line in String::from_utf8_lossy(bytes).split("\r\n") {
|
||||
println!("<- {:?}", &line[..std::cmp::min(line.len(), 100)]);
|
||||
}*/
|
||||
|
||||
let mut bytes = bytes.iter();
|
||||
let mut requests = Vec::with_capacity(2);
|
||||
let mut needs_literal = None;
|
||||
|
||||
loop {
|
||||
match self.receiver.parse(&mut bytes) {
|
||||
Ok(request) => match request.is_allowed(&self.state, self.is_tls) {
|
||||
Ok(request) => {
|
||||
requests.push(request);
|
||||
}
|
||||
Err(response) => {
|
||||
self.write_bytes(response.into_bytes()).await?;
|
||||
}
|
||||
},
|
||||
Err(receiver::Error::NeedsMoreData) => {
|
||||
break;
|
||||
}
|
||||
Err(receiver::Error::NeedsLiteral { size }) => {
|
||||
needs_literal = size.into();
|
||||
break;
|
||||
}
|
||||
Err(receiver::Error::Error { response }) => {
|
||||
self.write_bytes(response.into_bytes()).await?;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut requests = requests.into_iter().peekable();
|
||||
while let Some(request) = requests.next() {
|
||||
match request.command {
|
||||
Command::List | Command::Lsub => {
|
||||
//self.handle_list(request).await?;
|
||||
}
|
||||
Command::Select | Command::Examine => {
|
||||
//self.handle_select(request).await?;
|
||||
}
|
||||
Command::Create => {
|
||||
//self.handle_create(group_requests(&mut requests, vec![request])).await?;
|
||||
}
|
||||
Command::Delete => {
|
||||
//self.handle_delete(group_requests(&mut requests, vec![request])).await?;
|
||||
}
|
||||
Command::Rename => {
|
||||
//self.handle_rename(request).await?;
|
||||
}
|
||||
Command::Status => {
|
||||
//self.handle_status(request).await?;
|
||||
}
|
||||
Command::Append => {
|
||||
//self.handle_append(request).await?;
|
||||
}
|
||||
Command::Close => {
|
||||
//self.handle_close(request).await?;
|
||||
}
|
||||
Command::Unselect => {
|
||||
//self.handle_unselect(request).await?;
|
||||
}
|
||||
Command::Expunge(is_uid) => {
|
||||
//self.handle_expunge(request, is_uid).await?;
|
||||
}
|
||||
Command::Search(is_uid) => {
|
||||
//self.handle_search(request, false, is_uid).await?;
|
||||
}
|
||||
Command::Fetch(is_uid) => {
|
||||
//self.handle_fetch(request, is_uid).await?;
|
||||
}
|
||||
Command::Store(is_uid) => {
|
||||
//self.handle_store(request, is_uid).await?;
|
||||
}
|
||||
Command::Copy(is_uid) => {
|
||||
//self.handle_copy_move(request, false, is_uid).await?;
|
||||
}
|
||||
Command::Move(is_uid) => {
|
||||
//self.handle_copy_move(request, true, is_uid).await?;
|
||||
}
|
||||
Command::Sort(is_uid) => {
|
||||
//self.handle_search(request, true, is_uid).await?;
|
||||
}
|
||||
Command::Thread(is_uid) => {
|
||||
//self.handle_thread(request, is_uid).await?;
|
||||
}
|
||||
Command::Idle => {
|
||||
//self.handle_idle(request).await?;
|
||||
}
|
||||
Command::Subscribe => {
|
||||
//self.handle_subscribe(request, true).await?;
|
||||
}
|
||||
Command::Unsubscribe => {
|
||||
//self.handle_subscribe(request, false).await?;
|
||||
}
|
||||
Command::Namespace => {
|
||||
//self.handle_namespace(request).await?;
|
||||
}
|
||||
Command::Authenticate => {
|
||||
//self.handle_authenticate(request).await?;
|
||||
}
|
||||
Command::Login => {
|
||||
//self.handle_login(request).await?;
|
||||
}
|
||||
Command::Capability => {
|
||||
//self.handle_capability(request).await?;
|
||||
}
|
||||
Command::Enable => {
|
||||
//self.handle_enable(request).await?;
|
||||
}
|
||||
Command::StartTls => {
|
||||
//return self.handle_starttls(request).await;
|
||||
}
|
||||
Command::Noop => {
|
||||
//self.handle_noop(request, false).await?;
|
||||
}
|
||||
Command::Check => {
|
||||
//self.handle_noop(request, true).await?;
|
||||
}
|
||||
Command::Logout => {
|
||||
//self.handle_logout(request).await?;
|
||||
}
|
||||
Command::SetAcl => {
|
||||
//self.handle_set_acl(request).await?;
|
||||
}
|
||||
Command::DeleteAcl => {
|
||||
//self.handle_delete_acl(request).await?;
|
||||
}
|
||||
Command::GetAcl => {
|
||||
//self.handle_get_acl(request).await?;
|
||||
}
|
||||
Command::ListRights => {
|
||||
//self.handle_list_rights(request).await?;
|
||||
}
|
||||
Command::MyRights => {
|
||||
//self.handle_my_rights(request).await?;
|
||||
}
|
||||
Command::Unauthenticate => {
|
||||
//self.handle_unauthenticate(request).await?;
|
||||
}
|
||||
Command::Id => {
|
||||
//self.handle_id(request).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(needs_literal) = needs_literal {
|
||||
self.write_bytes(format!("+ Ready for {} bytes.\r\n", needs_literal).into_bytes())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn group_requests(
|
||||
requests: &mut Peekable<IntoIter<Request<Command>>>,
|
||||
mut grouped_requests: Vec<Request<Command>>,
|
||||
) -> Vec<Request<Command>> {
|
||||
let last_command = grouped_requests.last().unwrap().command;
|
||||
loop {
|
||||
match requests.peek() {
|
||||
Some(request) if request.command == last_command => {
|
||||
grouped_requests.push(requests.next().unwrap());
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
grouped_requests
|
||||
}
|
||||
|
||||
trait IsAllowed: Sized {
|
||||
fn is_allowed(self, state: &State, is_tls: bool) -> Result<Self, StatusResponse>;
|
||||
}
|
||||
|
||||
impl IsAllowed for Request<Command> {
|
||||
fn is_allowed(self, state: &State, is_tls: bool) -> Result<Self, StatusResponse> {
|
||||
match &self.command {
|
||||
Command::Capability | Command::Noop | Command::Logout | Command::Id => Ok(self),
|
||||
Command::StartTls => {
|
||||
if !is_tls {
|
||||
Ok(self)
|
||||
} else {
|
||||
Err(StatusResponse::no("Already in TLS mode.").with_tag(self.tag))
|
||||
}
|
||||
}
|
||||
Command::Authenticate => {
|
||||
if let State::NotAuthenticated { .. } = state {
|
||||
Ok(self)
|
||||
} else {
|
||||
Err(StatusResponse::no("Already authenticated.").with_tag(self.tag))
|
||||
}
|
||||
}
|
||||
Command::Login => {
|
||||
if let State::NotAuthenticated { .. } = state {
|
||||
if is_tls {
|
||||
Ok(self)
|
||||
} else {
|
||||
Err(
|
||||
StatusResponse::no("LOGIN is disabled on the clear-text port.")
|
||||
.with_tag(self.tag),
|
||||
)
|
||||
}
|
||||
} else {
|
||||
Err(StatusResponse::no("Already authenticated.").with_tag(self.tag))
|
||||
}
|
||||
}
|
||||
Command::Enable
|
||||
| Command::Select
|
||||
| Command::Examine
|
||||
| Command::Create
|
||||
| Command::Delete
|
||||
| Command::Rename
|
||||
| Command::Subscribe
|
||||
| Command::Unsubscribe
|
||||
| Command::List
|
||||
| Command::Lsub
|
||||
| Command::Namespace
|
||||
| Command::Status
|
||||
| Command::Append
|
||||
| Command::Idle
|
||||
| Command::SetAcl
|
||||
| Command::DeleteAcl
|
||||
| Command::GetAcl
|
||||
| Command::ListRights
|
||||
| Command::MyRights
|
||||
| Command::Unauthenticate => {
|
||||
if let State::Authenticated { .. } | State::Selected { .. } = state {
|
||||
Ok(self)
|
||||
} else {
|
||||
Err(StatusResponse::no("Not authenticated.").with_tag(self.tag))
|
||||
}
|
||||
}
|
||||
Command::Close
|
||||
| Command::Unselect
|
||||
| Command::Expunge(_)
|
||||
| Command::Search(_)
|
||||
| Command::Fetch(_)
|
||||
| Command::Store(_)
|
||||
| Command::Copy(_)
|
||||
| Command::Move(_)
|
||||
| Command::Check
|
||||
| Command::Sort(_)
|
||||
| Command::Thread(_) => match state {
|
||||
State::Selected { mailbox, .. } => {
|
||||
if mailbox.is_select
|
||||
|| !matches!(
|
||||
self.command,
|
||||
Command::Store(_) | Command::Expunge(_) | Command::Move(_),
|
||||
)
|
||||
{
|
||||
Ok(self)
|
||||
} else {
|
||||
Err(StatusResponse::no("Not permitted in EXAMINE state.")
|
||||
.with_tag(self.tag))
|
||||
}
|
||||
}
|
||||
State::Authenticated { .. } => {
|
||||
Err(StatusResponse::bad("No mailbox is selected.").with_tag(self.tag))
|
||||
}
|
||||
State::NotAuthenticated { .. } => {
|
||||
Err(StatusResponse::no("Not authenticated.").with_tag(self.tag))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub fn auth_failures(&self) -> u32 {
|
||||
match self {
|
||||
State::NotAuthenticated { auth_failures, .. } => *auth_failures,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn session_data(&self) -> Arc<SessionData> {
|
||||
match self {
|
||||
State::Authenticated { data } => data.clone(),
|
||||
State::Selected { data, .. } => data.clone(),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn mailbox_data(&self) -> (Arc<SessionData>, Arc<SelectedMailbox>) {
|
||||
match self {
|
||||
State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn session_mailbox_data(&self) -> (Arc<SessionData>, Option<Arc<SelectedMailbox>>) {
|
||||
match self {
|
||||
State::Authenticated { data } => (data.clone(), None),
|
||||
State::Selected { data, mailbox, .. } => (data.clone(), mailbox.clone().into()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn select_data(&self) -> (Arc<SessionData>, Arc<SelectedMailbox>) {
|
||||
match self {
|
||||
State::Selected { data, mailbox } => (data.clone(), mailbox.clone()),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_authenticated(&self) -> bool {
|
||||
matches!(self, State::Authenticated { .. } | State::Selected { .. })
|
||||
}
|
||||
|
||||
pub fn is_mailbox_selected(&self) -> bool {
|
||||
matches!(self, State::Selected { .. })
|
||||
}
|
||||
}
|
120
crates/imap/src/core/mod.rs
Normal file
120
crates/imap/src/core/mod.rs
Normal file
|
@ -0,0 +1,120 @@
|
|||
use std::{
|
||||
net::{IpAddr, SocketAddr},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use imap_proto::{protocol::ProtocolVersion, receiver::Receiver, Command};
|
||||
use jmap::{
|
||||
auth::{rate_limit::RemoteAddress, AccessToken},
|
||||
JMAP,
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncRead, ReadHalf},
|
||||
sync::{mpsc, watch},
|
||||
};
|
||||
use utils::listener::{limiter::InFlight, ServerInstance};
|
||||
|
||||
pub mod client;
|
||||
pub mod session;
|
||||
pub mod writer;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ImapSessionManager {
|
||||
pub jmap: Arc<JMAP>,
|
||||
pub imap: Arc<IMAP>,
|
||||
}
|
||||
|
||||
impl ImapSessionManager {
|
||||
pub fn new(jmap: Arc<JMAP>, imap: Arc<IMAP>) -> Self {
|
||||
Self { jmap, imap }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct IMAP {
|
||||
pub max_request_size: usize,
|
||||
pub name_shared: String,
|
||||
pub name_all: String,
|
||||
|
||||
pub timeout_auth: Duration,
|
||||
pub timeout_unauth: Duration,
|
||||
|
||||
pub greeting_plain: Vec<u8>,
|
||||
pub greeting_tls: Vec<u8>,
|
||||
}
|
||||
|
||||
pub struct Session<T: AsyncRead> {
|
||||
pub jmap: Arc<JMAP>,
|
||||
pub imap: Arc<IMAP>,
|
||||
pub instance: Arc<ServerInstance>,
|
||||
pub receiver: Receiver<Command>,
|
||||
pub version: ProtocolVersion,
|
||||
pub state: State,
|
||||
pub is_tls: bool,
|
||||
pub is_condstore: bool,
|
||||
pub is_qresync: bool,
|
||||
pub writer: mpsc::Sender<writer::Event>,
|
||||
pub stream_rx: ReadHalf<T>,
|
||||
pub in_flight: Vec<InFlight>,
|
||||
pub span: tracing::Span,
|
||||
}
|
||||
|
||||
pub struct SessionData {
|
||||
pub core: Arc<JMAP>,
|
||||
pub writer: mpsc::Sender<writer::Event>,
|
||||
pub access_token: Arc<AccessToken>,
|
||||
}
|
||||
|
||||
pub struct SelectedMailbox {
|
||||
pub id: MailboxId,
|
||||
pub state: parking_lot::Mutex<MailboxData>,
|
||||
pub saved_search: parking_lot::Mutex<SavedSearch>,
|
||||
pub is_select: bool,
|
||||
pub is_condstore: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub struct MailboxId {
|
||||
pub account_id: u32,
|
||||
pub mailbox_id: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MailboxData {
|
||||
pub uid_next: u32,
|
||||
pub uid_validity: u32,
|
||||
pub jmap_ids: Vec<u32>,
|
||||
pub imap_uids: Vec<u32>,
|
||||
pub total_messages: usize,
|
||||
pub last_state: u32,
|
||||
}
|
||||
|
||||
pub enum SavedSearch {
|
||||
InFlight {
|
||||
rx: watch::Receiver<Arc<Vec<ImapId>>>,
|
||||
},
|
||||
Results {
|
||||
items: Arc<Vec<ImapId>>,
|
||||
},
|
||||
None,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub struct ImapId {
|
||||
pub uid: u32,
|
||||
pub seqnum: u32,
|
||||
}
|
||||
|
||||
pub enum State {
|
||||
NotAuthenticated {
|
||||
remote_addr: RemoteAddress,
|
||||
auth_failures: u32,
|
||||
},
|
||||
Authenticated {
|
||||
data: Arc<SessionData>,
|
||||
},
|
||||
Selected {
|
||||
data: Arc<SessionData>,
|
||||
mailbox: Arc<SelectedMailbox>,
|
||||
},
|
||||
}
|
214
crates/imap/src/core/session.rs
Normal file
214
crates/imap/src/core/session.rs
Normal file
|
@ -0,0 +1,214 @@
|
|||
use imap_proto::{protocol::ProtocolVersion, receiver::Receiver};
|
||||
use jmap::auth::rate_limit::RemoteAddress;
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpStream,
|
||||
sync::oneshot,
|
||||
};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use utils::listener::{SessionData, SessionManager};
|
||||
|
||||
use super::{writer, ImapSessionManager, Session, State};
|
||||
|
||||
impl SessionManager for ImapSessionManager {
|
||||
fn spawn(&self, session: SessionData<TcpStream>) {
|
||||
let manager = self.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if session.instance.is_tls_implicit {
|
||||
if let Ok(session) = Session::<TlsStream<TcpStream>>::new(session, manager).await {
|
||||
session.handle_conn().await;
|
||||
}
|
||||
} else if let Ok(session) = Session::<TcpStream>::new(session, manager).await {
|
||||
session.handle_conn().await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn shutdown(&self) {
|
||||
// No-op
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> Session<T> {
|
||||
pub async fn handle_conn_(&mut self) -> bool {
|
||||
let mut buf = vec![0; 8192];
|
||||
let mut shutdown_rx = self.instance.shutdown_rx.clone();
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
result = tokio::time::timeout(
|
||||
if !matches!(self.state, State::NotAuthenticated {..}) {
|
||||
self.imap.timeout_auth
|
||||
} else {
|
||||
self.imap.timeout_unauth
|
||||
},
|
||||
self.stream_rx.read(&mut buf)) => {
|
||||
match result {
|
||||
Ok(Ok(bytes_read)) => {
|
||||
if bytes_read > 0 {
|
||||
match self.ingest(&buf[..bytes_read]).await {
|
||||
Ok(false) => (),
|
||||
Ok(true) => {
|
||||
return true;
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::debug!(parent: &self.span, event = "disconnect", "Disconnecting client.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(parent: &self.span, event = "close", "IMAP connection closed by client.");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Ok(Err(err)) => {
|
||||
tracing::debug!(parent: &self.span, event = "error", reason = %err, "IMAP connection error.");
|
||||
break;
|
||||
},
|
||||
Err(_) => {
|
||||
self.write_bytes(&b"* BYE Connection timed out.\r\n"[..]).await.ok();
|
||||
tracing::debug!(parent: &self.span, "IMAP connection timed out.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
_ = shutdown_rx.changed() => {
|
||||
self.write_bytes(&b"* BYE Server shutting down.\r\n"[..]).await.ok();
|
||||
tracing::debug!(parent: &self.span, event = "shutdown", "IMAP server shutting down.");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
impl Session<TcpStream> {
|
||||
pub async fn new(
|
||||
mut session: utils::listener::SessionData<TcpStream>,
|
||||
manager: ImapSessionManager,
|
||||
) -> Result<Session<TcpStream>, ()> {
|
||||
// Write plain text greeting
|
||||
if let Err(err) = session.stream.write_all(&manager.imap.greeting_plain).await {
|
||||
tracing::debug!(parent: &session.span, event = "error", reason = %err, "Failed to write greeting.");
|
||||
return Err(());
|
||||
}
|
||||
|
||||
// Split stream into read and write halves
|
||||
let (stream_rx, stream_tx) = tokio::io::split(session.stream);
|
||||
|
||||
Ok(Session {
|
||||
receiver: Receiver::with_max_request_size(manager.imap.max_request_size),
|
||||
version: ProtocolVersion::Rev1,
|
||||
state: State::NotAuthenticated {
|
||||
auth_failures: 0,
|
||||
remote_addr: RemoteAddress::IpAddress(session.remote_ip),
|
||||
},
|
||||
writer: writer::spawn_writer(writer::Event::Stream(stream_tx)),
|
||||
is_tls: false,
|
||||
is_condstore: false,
|
||||
is_qresync: false,
|
||||
imap: manager.imap,
|
||||
jmap: manager.jmap,
|
||||
instance: session.instance,
|
||||
span: session.span,
|
||||
in_flight: vec![session.in_flight],
|
||||
stream_rx,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn handle_conn(mut self) {
|
||||
if self.handle_conn_().await && self.instance.tls_acceptor.is_some() {
|
||||
if let Ok(session) = self.into_tls().await {
|
||||
session.handle_conn().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn into_tls(self) -> Result<Session<TlsStream<TcpStream>>, ()> {
|
||||
// Recover WriteHalf from writer
|
||||
let (tx, rx) = oneshot::channel();
|
||||
if let Err(err) = self.writer.send(writer::Event::Upgrade(tx)).await {
|
||||
tracing::debug!("Failed to write to channel: {}", err);
|
||||
return Err(());
|
||||
}
|
||||
let stream = if let Ok(stream_tx) = rx.await {
|
||||
self.stream_rx.unsplit(stream_tx)
|
||||
} else {
|
||||
tracing::debug!("Failed to read from channel");
|
||||
return Err(());
|
||||
};
|
||||
|
||||
// Upgrade to TLS
|
||||
let (stream_rx, stream_tx) =
|
||||
tokio::io::split(self.instance.tls_accept(stream, &self.span).await?);
|
||||
if let Err(err) = self.writer.send(writer::Event::StreamTls(stream_tx)).await {
|
||||
tracing::debug!("Failed to send stream: {}", err);
|
||||
return Err(());
|
||||
}
|
||||
|
||||
Ok(Session {
|
||||
jmap: self.jmap,
|
||||
imap: self.imap,
|
||||
instance: self.instance,
|
||||
receiver: self.receiver,
|
||||
version: self.version,
|
||||
state: self.state,
|
||||
is_tls: true,
|
||||
is_condstore: self.is_condstore,
|
||||
is_qresync: self.is_qresync,
|
||||
writer: self.writer,
|
||||
span: self.span,
|
||||
in_flight: self.in_flight,
|
||||
stream_rx,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Session<TlsStream<TcpStream>> {
|
||||
pub async fn new(
|
||||
session: utils::listener::SessionData<TcpStream>,
|
||||
manager: ImapSessionManager,
|
||||
) -> Result<Session<TlsStream<TcpStream>>, ()> {
|
||||
// Upgrade to TLS
|
||||
let mut stream = session
|
||||
.instance
|
||||
.tls_accept(session.stream, &session.span)
|
||||
.await?;
|
||||
|
||||
// Write TLS greeting
|
||||
let span = session.span;
|
||||
if let Err(err) = stream.write_all(&manager.imap.greeting_tls).await {
|
||||
tracing::debug!(parent: &span, event = "error", reason = %err, "Failed to write greeting.");
|
||||
return Err(());
|
||||
}
|
||||
|
||||
// Spit stream into read and write halves
|
||||
let (stream_rx, stream_tx) = tokio::io::split(stream);
|
||||
|
||||
Ok(Session {
|
||||
receiver: Receiver::with_max_request_size(manager.imap.max_request_size),
|
||||
version: ProtocolVersion::Rev1,
|
||||
state: State::NotAuthenticated {
|
||||
auth_failures: 0,
|
||||
remote_addr: RemoteAddress::IpAddress(session.remote_ip),
|
||||
},
|
||||
writer: writer::spawn_writer(writer::Event::StreamTls(stream_tx)),
|
||||
is_tls: true,
|
||||
is_condstore: false,
|
||||
is_qresync: false,
|
||||
imap: manager.imap,
|
||||
jmap: manager.jmap,
|
||||
instance: session.instance,
|
||||
span,
|
||||
in_flight: vec![session.in_flight],
|
||||
stream_rx,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn handle_conn(mut self) {
|
||||
self.handle_conn_().await;
|
||||
}
|
||||
}
|
143
crates/imap/src/core/writer.rs
Normal file
143
crates/imap/src/core/writer.rs
Normal file
|
@ -0,0 +1,143 @@
|
|||
/*
|
||||
* Copyright (c) 2020-2022, Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart IMAP Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWriteExt, WriteHalf},
|
||||
net::TcpStream,
|
||||
sync::{mpsc, oneshot},
|
||||
};
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{Session, SessionData};
|
||||
|
||||
const IPC_CHANNEL_BUFFER: usize = 128;
|
||||
|
||||
pub enum Event {
|
||||
Stream(WriteHalf<TcpStream>),
|
||||
StreamTls(WriteHalf<TlsStream<TcpStream>>),
|
||||
Bytes(Cow<'static, [u8]>),
|
||||
Upgrade(oneshot::Sender<WriteHalf<TcpStream>>),
|
||||
}
|
||||
|
||||
pub fn spawn_writer(mut stream: Event) -> mpsc::Sender<Event> {
|
||||
let (tx, mut rx) = mpsc::channel::<Event>(IPC_CHANNEL_BUFFER);
|
||||
tokio::spawn(async move {
|
||||
'outer: loop {
|
||||
match stream {
|
||||
Event::Stream(mut stream_tx) => {
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event {
|
||||
Event::Bytes(bytes) => {
|
||||
/*let tmp = "dd";
|
||||
println!(
|
||||
"<- {:?}",
|
||||
String::from_utf8_lossy(
|
||||
&bytes[..std::cmp::min(bytes.len(), 100)]
|
||||
)
|
||||
);*/
|
||||
|
||||
if let Err(err) = stream_tx.write_all(bytes.as_ref()).await {
|
||||
debug!("Failed to write to stream: {}", err);
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
Event::Upgrade(channel) => {
|
||||
if channel.send(stream_tx).is_err() {
|
||||
debug!("Failed to send stream.");
|
||||
break 'outer;
|
||||
}
|
||||
if let Some(stream_) = rx.recv().await {
|
||||
stream = stream_;
|
||||
continue 'outer;
|
||||
} else {
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
stream = event;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
break 'outer;
|
||||
}
|
||||
Event::StreamTls(mut stream_tx) => {
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event {
|
||||
Event::Bytes(bytes) => {
|
||||
if let Err(err) = stream_tx.write_all(bytes.as_ref()).await {
|
||||
debug!("Failed to write to stream: {}", err);
|
||||
break 'outer;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
stream = event;
|
||||
continue 'outer;
|
||||
}
|
||||
}
|
||||
}
|
||||
break 'outer;
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
tx
|
||||
}
|
||||
|
||||
impl<T: AsyncRead> Session<T> {
|
||||
pub async fn write_bytes(&self, bytes: impl Into<Cow<'static, [u8]>>) -> Result<(), ()> {
|
||||
/*let tmp = "dd";
|
||||
println!(
|
||||
"-> {:?}",
|
||||
String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)])
|
||||
);*/
|
||||
|
||||
if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await {
|
||||
debug!("Failed to send bytes: {}", err);
|
||||
Err(())
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SessionData {
|
||||
pub async fn write_bytes(&self, bytes: impl Into<Cow<'static, [u8]>>) -> bool {
|
||||
/*let tmp = "dd";
|
||||
println!(
|
||||
"-> {:?}",
|
||||
String::from_utf8_lossy(&bytes[..std::cmp::min(bytes.len(), 100)])
|
||||
);*/
|
||||
|
||||
if let Err(err) = self.writer.send(Event::Bytes(bytes.into())).await {
|
||||
debug!("Failed to send bytes: {}", err);
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use crate::core::IMAP;
|
||||
|
||||
use directory::DirectoryConfig;
|
||||
use imap_proto::{protocol::capability::Capability, ResponseCode, StatusResponse};
|
||||
use utils::config::Config;
|
||||
|
||||
pub mod core;
|
||||
|
||||
static SERVER_GREETING: &str = concat!(
|
||||
"Stalwart IMAP4rev2 v",
|
||||
env!("CARGO_PKG_VERSION"),
|
||||
" at your service."
|
||||
);
|
||||
|
||||
impl IMAP {
|
||||
pub async fn init(config: &Config, directory: &DirectoryConfig) -> Result<Arc<Self>, String> {
|
||||
Ok(Arc::new(IMAP {
|
||||
max_request_size: config.property_or_static("imap.request.max-size", "52428800")?,
|
||||
name_shared: config
|
||||
.value("imap.folders.name.shared")
|
||||
.unwrap_or("Shared Folders")
|
||||
.to_string(),
|
||||
name_all: config
|
||||
.value("imap.folders.name.all")
|
||||
.unwrap_or("All Mail")
|
||||
.to_string(),
|
||||
greeting_plain: StatusResponse::ok(SERVER_GREETING)
|
||||
.with_code(ResponseCode::Capability {
|
||||
capabilities: Capability::all_capabilities(false, false),
|
||||
})
|
||||
.into_bytes(),
|
||||
timeout_auth: config.property_or_static("imap.timeout.authenticated", "30m")?,
|
||||
timeout_unauth: config.property_or_static("imap.timeout.anonymous", "1m")?,
|
||||
greeting_tls: StatusResponse::ok(SERVER_GREETING)
|
||||
.with_code(ResponseCode::Capability {
|
||||
capabilities: Capability::all_capabilities(false, true),
|
||||
})
|
||||
.into_bytes(),
|
||||
}))
|
||||
}
|
||||
}
|
|
@ -85,35 +85,7 @@ impl Session<TcpStream> {
|
|||
pub async fn into_tls(self) -> Result<Session<TlsStream<TcpStream>>, ()> {
|
||||
let span = self.span;
|
||||
Ok(Session {
|
||||
stream: match self
|
||||
.instance
|
||||
.tls_acceptor
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.accept(self.stream)
|
||||
.await
|
||||
{
|
||||
Ok(stream) => {
|
||||
tracing::info!(
|
||||
parent: &span,
|
||||
context = "tls",
|
||||
event = "handshake",
|
||||
version = ?stream.get_ref().1.protocol_version().unwrap_or(rustls::ProtocolVersion::TLSv1_3),
|
||||
cipher = ?stream.get_ref().1.negotiated_cipher_suite().unwrap_or(rustls::cipher_suite::TLS13_AES_128_GCM_SHA256),
|
||||
);
|
||||
stream
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
parent: &span,
|
||||
context = "tls",
|
||||
event = "error",
|
||||
"Failed to accept TLS connection: {}",
|
||||
err
|
||||
);
|
||||
return Err(());
|
||||
}
|
||||
},
|
||||
stream: self.instance.tls_accept(self.stream, &span).await?,
|
||||
state: self.state,
|
||||
data: self.data,
|
||||
instance: self.instance,
|
||||
|
|
|
@ -23,8 +23,12 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::{net::TcpListener, sync::watch};
|
||||
use tokio_rustls::TlsAcceptor;
|
||||
use tokio::{
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::watch,
|
||||
};
|
||||
use tokio_rustls::{server::TlsStream, TlsAcceptor};
|
||||
use tracing::Span;
|
||||
|
||||
use crate::{
|
||||
config::{Config, Listener, Server, ServerProtocol, Servers},
|
||||
|
@ -186,3 +190,34 @@ impl Listener {
|
|||
listener
|
||||
}
|
||||
}
|
||||
|
||||
impl ServerInstance {
|
||||
pub async fn tls_accept(
|
||||
&self,
|
||||
stream: TcpStream,
|
||||
span: &Span,
|
||||
) -> Result<TlsStream<TcpStream>, ()> {
|
||||
match self.tls_acceptor.as_ref().unwrap().accept(stream).await {
|
||||
Ok(stream) => {
|
||||
tracing::info!(
|
||||
parent: span,
|
||||
context = "tls",
|
||||
event = "handshake",
|
||||
version = ?stream.get_ref().1.protocol_version().unwrap_or(rustls::ProtocolVersion::TLSv1_3),
|
||||
cipher = ?stream.get_ref().1.negotiated_cipher_suite().unwrap_or(rustls::cipher_suite::TLS13_AES_128_GCM_SHA256),
|
||||
);
|
||||
Ok(stream)
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::debug!(
|
||||
parent: span,
|
||||
context = "tls",
|
||||
event = "error",
|
||||
"Failed to accept TLS connection: {}",
|
||||
err
|
||||
);
|
||||
Err(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue