From 1c460c7f3b272608b29a93ee073241cb6b2acfbc Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 2 Mar 2025 16:32:11 +0100 Subject: [PATCH] DAV skeleton --- Cargo.lock | 20 ++ crates/dav/Cargo.toml | 6 + crates/dav/src/lib.rs | 87 +++++ crates/dav/src/request.rs | 38 +++ crates/http-proto/src/lib.rs | 10 +- crates/http-proto/src/response.rs | 236 ++++++++------ crates/http/Cargo.toml | 1 + .../src/management/enterprise/telemetry.rs | 297 +++++++++--------- crates/http/src/management/troubleshoot.rs | 23 +- crates/http/src/request.rs | 60 +++- crates/jmap/src/api/event_source.rs | 13 +- crates/jmap/src/api/mod.rs | 8 +- crates/jmap/src/api/request.rs | 4 +- crates/jmap/src/websocket/stream.rs | 2 +- crates/jmap/src/websocket/upgrade.rs | 8 +- 15 files changed, 515 insertions(+), 298 deletions(-) create mode 100644 crates/dav/src/request.rs diff --git a/Cargo.lock b/Cargo.lock index 64c0d32d..3d23ee58 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1669,6 +1669,25 @@ checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010" [[package]] name = "dav" version = "0.11.5" +dependencies = [ + "common", + "dav-proto", + "groupware", + "hashify", + "http_proto", + "hyper 1.6.0", +] + +[[package]] +name = "dav-proto" +version = "0.1.0" +dependencies = [ + "calcard", + "hashify", + "hyper 1.6.0", + "mail-parser", + "quick-xml 0.37.2", +] [[package]] name = "dbl" @@ -2988,6 +3007,7 @@ dependencies = [ "base64 0.22.1", "chrono", "common", + "dav", "directory", "email", "form-data", diff --git a/crates/dav/Cargo.toml b/crates/dav/Cargo.toml index 58d14eed..995ca3d6 100644 --- a/crates/dav/Cargo.toml +++ b/crates/dav/Cargo.toml @@ -5,6 +5,12 @@ edition = "2024" resolver = "2" [dependencies] +dav-proto = { path = "/Users/me/code/dav-proto" } +common = { path = "../common" } +groupware = { path = "../groupware" } +http_proto = { path = "../http-proto" } +hashify = { version = "0.2" } +hyper = { version = "1.0.1", features = ["server", "http1", "http2"] } [dev-dependencies] diff --git a/crates/dav/src/lib.rs b/crates/dav/src/lib.rs index c8a832f8..4c2fd2ae 100644 --- a/crates/dav/src/lib.rs +++ b/crates/dav/src/lib.rs @@ -3,3 +3,90 @@ * * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ + +pub mod request; + +use http_proto::HttpResponse; +use hyper::{Method, StatusCode}; + +#[derive(Debug, Clone, Copy)] +pub enum DavResource { + Card, + Cal, + File, +} + +#[derive(Debug, Clone, Copy)] +pub enum DavMethod { + GET, + PUT, + POST, + DELETE, + PATCH, + PROPFIND, + PROPPATCH, + REPORT, + MKCOL, + COPY, + MOVE, + LOCK, + UNLOCK, + OPTIONS, +} + +impl DavResource { + pub fn parse(service: &str) -> Option { + hashify::tiny_map!(service.as_bytes(), + "card" => DavResource::Card, + "cal" => DavResource::Cal, + "file" => DavResource::File + ) + } + + pub fn into_options_response(self) -> HttpResponse { + let todo = "true"; + HttpResponse::new(StatusCode::OK) + .with_header("DAV", "1, 2, 3, access-control, calendar-access") + } +} + +impl DavMethod { + pub fn parse(method: &Method) -> Option { + match *method { + Method::GET => Some(DavMethod::GET), + Method::PUT => Some(DavMethod::PUT), + Method::DELETE => Some(DavMethod::DELETE), + Method::OPTIONS => Some(DavMethod::OPTIONS), + Method::POST => Some(DavMethod::POST), + Method::PATCH => Some(DavMethod::PATCH), + _ => { + hashify::tiny_map!(method.as_str().as_bytes(), + "PROPFIND" => DavMethod::PROPFIND, + "PROPPATCH" => DavMethod::PROPPATCH, + "REPORT" => DavMethod::REPORT, + "MKCOL" => DavMethod::MKCOL, + "COPY" => DavMethod::COPY, + "MOVE" => DavMethod::MOVE, + "LOCK" => DavMethod::LOCK, + "UNLOCK" => DavMethod::UNLOCK + ) + } + } + } + + #[inline] + pub fn has_body(self) -> bool { + matches!( + self, + DavMethod::PUT + | DavMethod::POST + | DavMethod::PATCH + | DavMethod::PROPPATCH + | DavMethod::PROPFIND + | DavMethod::REPORT + | DavMethod::MKCOL + | DavMethod::COPY + | DavMethod::MOVE + ) + } +} diff --git a/crates/dav/src/request.rs b/crates/dav/src/request.rs new file mode 100644 index 00000000..458cfcb7 --- /dev/null +++ b/crates/dav/src/request.rs @@ -0,0 +1,38 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + +use std::sync::Arc; + +use common::{Server, auth::AccessToken}; +use http_proto::{HttpRequest, HttpResponse, HttpSessionData}; + +use crate::{DavMethod, DavResource}; + +pub trait DavRequestHandler: Sync + Send { + fn handle_dav_request( + &self, + request: HttpRequest, + access_token: Arc, + session: &HttpSessionData, + resource: DavResource, + method: DavMethod, + body: Vec, + ) -> impl Future + Send; +} + +impl DavRequestHandler for Server { + async fn handle_dav_request( + &self, + request: HttpRequest, + access_token: Arc, + session: &HttpSessionData, + resource: DavResource, + method: DavMethod, + body: Vec, + ) -> HttpResponse { + todo!() + } +} diff --git a/crates/http-proto/src/lib.rs b/crates/http-proto/src/lib.rs index dfd05999..5b99f6fd 100644 --- a/crates/http-proto/src/lib.rs +++ b/crates/http-proto/src/lib.rs @@ -10,7 +10,7 @@ pub mod response; pub use form_urlencoded; -use std::{borrow::Cow, net::IpAddr, sync::Arc}; +use std::{net::IpAddr, sync::Arc}; use common::listener::ServerInstance; use hyper::StatusCode; @@ -37,11 +37,9 @@ pub enum HttpResponseBody { } pub struct HttpResponse { - pub status: StatusCode, - pub content_type: Cow<'static, str>, - pub content_disposition: Cow<'static, str>, - pub cache_control: Cow<'static, str>, - pub body: HttpResponseBody, + status: StatusCode, + builder: hyper::http::response::Builder, + body: HttpResponseBody, } pub struct HttpContext<'x> { diff --git a/crates/http-proto/src/response.rs b/crates/http-proto/src/response.rs index 2bc77a32..7136ad7c 100644 --- a/crates/http-proto/src/response.rs +++ b/crates/http-proto/src/response.rs @@ -4,11 +4,13 @@ * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL */ - use std::borrow::Cow; - use common::manager::webadmin::Resource; use http_body_util::{BodyExt, Full}; -use hyper::{StatusCode, body::Bytes, header}; +use hyper::{ + StatusCode, + body::Bytes, + header::{self, HeaderName, HeaderValue}, +}; use serde_json::json; use crate::{ @@ -17,42 +19,91 @@ use crate::{ }; impl HttpResponse { - pub fn new_empty(status: StatusCode) -> Self { + pub fn new(status: StatusCode) -> Self { HttpResponse { status, - content_type: "".into(), - content_disposition: "".into(), - cache_control: "".into(), + builder: hyper::Response::builder().status(status), body: HttpResponseBody::Empty, } } - pub fn new_text( - status: StatusCode, - content_type: impl Into>, - body: impl Into, - ) -> Self { - HttpResponse { - status, - content_type: content_type.into(), - content_disposition: "".into(), - cache_control: "".into(), - body: HttpResponseBody::Text(body.into()), - } + pub fn with_content_type(mut self, content_type: V) -> Self + where + V: TryInto, + >::Error: Into, + { + self.builder = self.builder.header(header::CONTENT_TYPE, content_type); + self } - pub fn new_binary( - status: StatusCode, - content_type: impl Into>, - body: impl Into>, + pub fn with_header(mut self, name: K, value: V) -> Self + where + K: TryInto, + >::Error: Into, + V: TryInto, + >::Error: Into, + { + self.builder = self.builder.header(name, value); + self + } + + pub fn with_text_body(mut self, body: impl Into) -> Self { + self.body = HttpResponseBody::Text(body.into()); + self + } + + pub fn with_binary_body(mut self, body: impl Into>) -> Self { + self.body = HttpResponseBody::Binary(body.into()); + self + } + + pub fn with_stream_body( + mut self, + stream: http_body_util::combinators::BoxBody, ) -> Self { - HttpResponse { - status, - content_type: content_type.into(), - content_disposition: "".into(), - cache_control: "".into(), - body: HttpResponseBody::Binary(body.into()), - } + self.body = HttpResponseBody::Stream(stream); + self + } + + pub fn with_websocket_upgrade(mut self, derived_key: String) -> Self { + self.body = HttpResponseBody::WebsocketUpgrade(derived_key); + self + } + + pub fn with_content_disposition(mut self, content_disposition: V) -> Self + where + V: TryInto, + >::Error: Into, + { + self.builder = self + .builder + .header(header::CONTENT_DISPOSITION, content_disposition); + self + } + + pub fn with_cache_control(mut self, cache_control: V) -> Self + where + V: TryInto, + >::Error: Into, + { + self.builder = self.builder.header(header::CACHE_CONTROL, cache_control); + self + } + + pub fn with_no_cache(mut self) -> Self { + self.builder = self + .builder + .header(header::CACHE_CONTROL, "no-store, no-cache, must-revalidate"); + self + } + + pub fn with_location(mut self, location: V) -> Self + where + V: TryInto, + >::Error: Into, + { + self.builder = self.builder.header(header::LOCATION, location); + self } pub fn size(&self) -> usize { @@ -67,46 +118,25 @@ impl HttpResponse { self, ) -> hyper::Response> { - let builder = hyper::Response::builder().status(self.status); - match self.body { - HttpResponseBody::Text(body) => builder - .header(header::CONTENT_TYPE, self.content_type.as_ref()) - .body( - Full::new(Bytes::from(body)) - .map_err(|never| match never {}) - .boxed(), - ), - HttpResponseBody::Binary(body) => { - let mut builder = builder.header(header::CONTENT_TYPE, self.content_type.as_ref()); - - if !self.content_disposition.is_empty() { - builder = builder.header( - header::CONTENT_DISPOSITION, - self.content_disposition.as_ref(), - ); - } - - if !self.cache_control.is_empty() { - builder = builder.header(header::CACHE_CONTROL, self.cache_control.as_ref()); - } - - builder.body( - Full::new(Bytes::from(body)) - .map_err(|never| match never {}) - .boxed(), - ) - } - HttpResponseBody::Empty => builder.body( + HttpResponseBody::Text(body) => self.builder.body( + Full::new(Bytes::from(body)) + .map_err(|never| match never {}) + .boxed(), + ), + HttpResponseBody::Binary(body) => self.builder.body( + Full::new(Bytes::from(body)) + .map_err(|never| match never {}) + .boxed(), + ), + HttpResponseBody::Empty => self.builder.body( Full::new(Bytes::new()) .map_err(|never| match never {}) .boxed(), ), - HttpResponseBody::Stream(stream) => builder - .header(header::CONTENT_TYPE, self.content_type.as_ref()) - .header(header::CACHE_CONTROL, self.cache_control.as_ref()) - .body(stream), - HttpResponseBody::WebsocketUpgrade(derived_key) => builder + HttpResponseBody::Stream(stream) => self.builder.body(stream), + HttpResponseBody::WebsocketUpgrade(derived_key) => self + .builder .header(header::CONNECTION, "upgrade") .header(header::UPGRADE, "websocket") .header("Sec-WebSocket-Accept", &derived_key) @@ -119,67 +149,73 @@ impl HttpResponse { } .unwrap() } + + pub fn body(&self) -> &HttpResponseBody { + &self.body + } + + pub fn status(&self) -> StatusCode { + self.status + } } impl ToHttpResponse for JsonResponse { fn into_http_response(self) -> HttpResponse { - HttpResponse { - status: self.status, - content_type: "application/json; charset=utf-8".into(), - content_disposition: "".into(), - cache_control: if !self.no_cache { - "" - } else { - "no-store, no-cache, must-revalidate" - } - .into(), - body: HttpResponseBody::Text(serde_json::to_string(&self.inner).unwrap_or_default()), + let response = HttpResponse::new(self.status) + .with_content_type("application/json; charset=utf-8") + .with_text_body(serde_json::to_string(&self.inner).unwrap_or_default()); + + if self.no_cache { + response.with_no_cache() + } else { + response } } } impl ToHttpResponse for DownloadResponse { fn into_http_response(self) -> HttpResponse { - HttpResponse { - status: StatusCode::OK, - content_type: self.content_type.into(), - content_disposition: format!( + HttpResponse::new(StatusCode::OK) + .with_content_type(self.content_type) + .with_content_disposition(format!( "attachment; filename=\"{}\"", self.filename.replace('\"', "\\\"") - ) - .into(), - cache_control: "private, immutable, max-age=31536000".into(), - body: HttpResponseBody::Binary(self.blob), - } + )) + .with_cache_control("private, immutable, max-age=31536000") + .with_binary_body(self.blob) } } impl ToHttpResponse for Resource> { fn into_http_response(self) -> HttpResponse { - HttpResponse::new_binary(StatusCode::OK, self.content_type, self.contents) + HttpResponse::new(StatusCode::OK) + .with_content_type(self.content_type.as_ref()) + .with_binary_body(self.contents) } } impl ToHttpResponse for HtmlResponse { fn into_http_response(self) -> HttpResponse { - HttpResponse::new_text(self.status, "text/html; charset=utf-8", self.body) + HttpResponse::new(self.status) + .with_content_type("text/html; charset=utf-8") + .with_text_body(self.body) } } impl ToHttpResponse for JsonProblemResponse { fn into_http_response(self) -> HttpResponse { - HttpResponse::new_text( - self.0, - "application/problem+json", - serde_json::to_string(&json!( - { - "type": "about:blank", - "title": self.0.canonical_reason().unwrap_or_default(), - "status": self.0.as_u16(), - "detail": self.0.canonical_reason().unwrap_or_default(), - } - )) - .unwrap_or_default(), - ) + HttpResponse::new(self.0) + .with_content_type("application/problem+json") + .with_text_body( + serde_json::to_string(&json!( + { + "type": "about:blank", + "title": self.0.canonical_reason().unwrap_or_default(), + "status": self.0.as_u16(), + "detail": self.0.canonical_reason().unwrap_or_default(), + } + )) + .unwrap_or_default(), + ) } } diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml index 7855521c..e0022096 100644 --- a/crates/http/Cargo.toml +++ b/crates/http/Cargo.toml @@ -12,6 +12,7 @@ trc = { path = "../trc" } email = { path = "../email" } smtp = { path = "../smtp" } jmap = { path = "../jmap" } +dav = { path = "../dav" } spam-filter = { path = "../spam-filter" } http_proto = { path = "../http-proto" } jmap_proto = { path = "../jmap-proto" } diff --git a/crates/http/src/management/enterprise/telemetry.rs b/crates/http/src/management/enterprise/telemetry.rs index c054ee20..0aed25cf 100644 --- a/crates/http/src/management/enterprise/telemetry.rs +++ b/crates/http/src/management/enterprise/telemetry.rs @@ -203,107 +203,104 @@ impl TelemetryApi for Server { let mut events = Vec::new(); let mut active_span_ids = AHashSet::new(); - Ok(HttpResponse { - status: StatusCode::OK, - content_type: "text/event-stream".into(), - content_disposition: "".into(), - cache_control: "no-store".into(), - body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new( - async_stream::stream! { - let mut last_message = Instant::now() - throttle; - let mut timeout = ping_interval; + Ok(HttpResponse::new(StatusCode::OK) + .with_content_type("text/event-stream") + .with_cache_control("no-store") + .with_stream_body(BoxBody::new(StreamBody::new( + async_stream::stream! { + let mut last_message = Instant::now() - throttle; + let mut timeout = ping_interval; - loop { - match tokio::time::timeout(timeout, rx.recv()).await { - Ok(Some(event_batch)) => { - for event in event_batch { - if (filter.is_none() && key_filters.is_empty()) - || event - .span_id() - .is_some_and(|span_id| active_span_ids.contains(&span_id)) + loop { + match tokio::time::timeout(timeout, rx.recv()).await { + Ok(Some(event_batch)) => { + for event in event_batch { + if (filter.is_none() && key_filters.is_empty()) + || event + .span_id() + .is_some_and(|span_id| active_span_ids.contains(&span_id)) + { + events.push(event); + } else { + let mut matched_keys = AHashSet::new(); + for (key, value) in event + .keys + .iter() + .chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter())) { - events.push(event); - } else { - let mut matched_keys = AHashSet::new(); - for (key, value) in event - .keys - .iter() - .chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter())) - { - if let Some(needle) = key_filters.get(key).or(filter.as_ref()) { - let matches = match value { - Value::Static(haystack) => haystack.contains(needle), - Value::String(haystack) => haystack.contains(needle), - Value::Timestamp(haystack) => { - DateTime::from_timestamp(*haystack as i64) - .to_rfc3339() - .contains(needle) - } - Value::Bool(true) => needle == "true", - Value::Bool(false) => needle == "false", - Value::Ipv4(haystack) => haystack.to_string().contains(needle), - Value::Ipv6(haystack) => haystack.to_string().contains(needle), - Value::Event(_) | - Value::Array(_) | - Value::UInt(_) | - Value::Int(_) | - Value::Float(_) | - Value::Duration(_) | - Value::Bytes(_) | - Value::None => false, - }; + if let Some(needle) = key_filters.get(key).or(filter.as_ref()) { + let matches = match value { + Value::Static(haystack) => haystack.contains(needle), + Value::String(haystack) => haystack.contains(needle), + Value::Timestamp(haystack) => { + DateTime::from_timestamp(*haystack as i64) + .to_rfc3339() + .contains(needle) + } + Value::Bool(true) => needle == "true", + Value::Bool(false) => needle == "false", + Value::Ipv4(haystack) => haystack.to_string().contains(needle), + Value::Ipv6(haystack) => haystack.to_string().contains(needle), + Value::Event(_) | + Value::Array(_) | + Value::UInt(_) | + Value::Int(_) | + Value::Float(_) | + Value::Duration(_) | + Value::Bytes(_) | + Value::None => false, + }; - if matches { - matched_keys.insert(*key); - if filter.is_some() || matched_keys.len() == key_filters.len() { - if let Some(span_id) = event.span_id() { - active_span_ids.insert(span_id); - } - events.push(event); - break; + if matches { + matched_keys.insert(*key); + if filter.is_some() || matched_keys.len() == key_filters.len() { + if let Some(span_id) = event.span_id() { + active_span_ids.insert(span_id); } + events.push(event); + break; } } } } } } - Ok(None) => { - break; - } - Err(_) => (), } - - timeout = if !events.is_empty() { - let elapsed = last_message.elapsed(); - if elapsed >= throttle { - last_message = Instant::now(); - yield Ok(Frame::data(Bytes::from(format!( - "event: trace\ndata: {}\n\n", - serde_json::to_string( - &JsonEventSerializer::new(std::mem::take(&mut events)) - .with_description() - .with_explanation()).unwrap_or_default() - )))); - - ping_interval - } else { - throttle - elapsed - } - } else { - let elapsed = last_ping.elapsed(); - if elapsed >= ping_interval { - last_ping = Instant::now(); - yield Ok(Frame::data(ping_payload.clone())); - ping_interval - } else { - ping_interval - elapsed - } - }; + Ok(None) => { + break; + } + Err(_) => (), } - }, - ))), - }) + + timeout = if !events.is_empty() { + let elapsed = last_message.elapsed(); + if elapsed >= throttle { + last_message = Instant::now(); + yield Ok(Frame::data(Bytes::from(format!( + "event: trace\ndata: {}\n\n", + serde_json::to_string( + &JsonEventSerializer::new(std::mem::take(&mut events)) + .with_description() + .with_explanation()).unwrap_or_default() + )))); + + ping_interval + } else { + throttle - elapsed + } + } else { + let elapsed = last_ping.elapsed(); + if elapsed >= ping_interval { + last_ping = Instant::now(); + yield Ok(Frame::data(ping_payload.clone())); + ping_interval + } else { + ping_interval - elapsed + } + }; + } + }, + )))) } ("trace", id, &Method::GET) => { // Validate the access token @@ -474,73 +471,69 @@ impl TelemetryApi for Server { } } - Ok(HttpResponse { - status: StatusCode::OK, - content_type: "text/event-stream".into(), - content_disposition: "".into(), - cache_control: "no-store".into(), - body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new( - async_stream::stream! { + Ok(HttpResponse::new(StatusCode::OK) + .with_content_type("text/event-stream") + .with_cache_control("no-store") + .with_stream_body(BoxBody::new(StreamBody::new( + async_stream::stream! { + loop { + let mut metrics = String::with_capacity(512); + metrics.push_str("event: metrics\ndata: ["); + let mut is_first = true; - loop { - let mut metrics = String::with_capacity(512); - metrics.push_str("event: metrics\ndata: ["); - let mut is_first = true; - - for counter in Collector::collect_counters(true) { - if event_types.is_empty() || event_types.contains(&counter.id()) { - if !is_first { - metrics.push(','); - } else { - is_first = false; - } - let _ = write!( - &mut metrics, - "{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}", - counter.id().name(), - counter.value() - ); + for counter in Collector::collect_counters(true) { + if event_types.is_empty() || event_types.contains(&counter.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}", + counter.id().name(), + counter.value() + ); } - for gauge in Collector::collect_gauges(true) { - if metric_types.is_empty() || metric_types.contains(&gauge.id()) { - if !is_first { - metrics.push(','); - } else { - is_first = false; - } - let _ = write!( - &mut metrics, - "{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}", - gauge.id().name(), - gauge.get() - ); - } - } - for histogram in Collector::collect_histograms(true) { - if metric_types.is_empty() || metric_types.contains(&histogram.id()) { - if !is_first { - metrics.push(','); - } else { - is_first = false; - } - let _ = write!( - &mut metrics, - "{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}", - histogram.id().name(), - histogram.count(), - histogram.sum() - ); - } - } - metrics.push_str("]\n\n"); - - yield Ok(Frame::data(Bytes::from(metrics))); - tokio::time::sleep(interval).await; } - }, - ))), - }) + for gauge in Collector::collect_gauges(true) { + if metric_types.is_empty() || metric_types.contains(&gauge.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; + } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}", + gauge.id().name(), + gauge.get() + ); + } + } + for histogram in Collector::collect_histograms(true) { + if metric_types.is_empty() || metric_types.contains(&histogram.id()) { + if !is_first { + metrics.push(','); + } else { + is_first = false; + } + let _ = write!( + &mut metrics, + "{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}", + histogram.id().name(), + histogram.count(), + histogram.sum() + ); + } + } + metrics.push_str("]\n\n"); + + yield Ok(Frame::data(Bytes::from(metrics))); + tokio::time::sleep(interval).await; + } + }, + )))) } _ => Err(trc::ResourceEvent::NotFound.into_err()), } diff --git a/crates/http/src/management/troubleshoot.rs b/crates/http/src/management/troubleshoot.rs index b0f7fe58..bb4429e9 100644 --- a/crates/http/src/management/troubleshoot.rs +++ b/crates/http/src/management/troubleshoot.rs @@ -89,20 +89,15 @@ impl TroubleshootApi for Server { timeout, ); - Ok(HttpResponse { - status: StatusCode::OK, - content_type: "text/event-stream".into(), - content_disposition: "".into(), - cache_control: "no-store".into(), - body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new( - async_stream::stream! { - while let Some(stage) = rx.recv().await { - yield Ok(stage.to_frame()); - } - yield Ok(DeliveryStage::Completed.to_frame()); - }, - ))), - }) + Ok(HttpResponse::new(StatusCode::OK) + .with_content_type("text/event-stream") + .with_cache_control("no-store") + .with_stream_body(BoxBody::new(StreamBody::new(async_stream::stream! { + while let Some(stage) = rx.recv().await { + yield Ok(stage.to_frame()); + } + yield Ok(DeliveryStage::Completed.to_frame()); + })))) } ("dmarc", None, &Method::POST) => { let request = serde_json::from_slice::( diff --git a/crates/http/src/request.rs b/crates/http/src/request.rs index 290def79..633ffcdb 100644 --- a/crates/http/src/request.rs +++ b/crates/http/src/request.rs @@ -1,3 +1,9 @@ +/* + * SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd + * + * SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL + */ + use std::{net::IpAddr, sync::Arc}; use common::{ @@ -8,6 +14,7 @@ use common::{ listener::{SessionData, SessionManager, SessionStream}, manager::webadmin::Resource, }; +use dav::{DavMethod, DavResource, request::DavRequestHandler}; use directory::Permission; use http_proto::{ DownloadResponse, HttpContext, HttpRequest, HttpResponse, HttpResponseBody, HttpSessionData, @@ -107,7 +114,7 @@ impl ParseHttp for Server { })?; return Ok(self - .handle_request(request, access_token, &session) + .handle_jmap_request(request, access_token, &session) .await .into_http_response()); } @@ -197,6 +204,41 @@ impl ParseHttp for Server { _ => (), } } + "dav" => { + let response = match ( + path.next().and_then(DavResource::parse), + DavMethod::parse(req.method()), + ) { + (Some(resource), Some(DavMethod::OPTIONS)) => resource.into_options_response(), + (Some(resource), Some(method)) => { + // Authenticate request + let (_in_flight, access_token) = + self.authenticate_headers(&req, &session, false).await?; + let body = if method.has_body() { + fetch_body( + &mut req, + if !access_token.has_permission(Permission::UnlimitedUploads) { + self.core.jmap.upload_max_size + } else { + 0 + }, + session.session_id, + ) + .await + .ok_or_else(|| trc::LimitEvent::SizeRequest.into_err())? + } else { + Vec::new() + }; + + self.handle_dav_request(req, access_token, &session, resource, method, body) + .await + } + (_, None) => HttpResponse::new(StatusCode::METHOD_NOT_ALLOWED), + (None, _) => HttpResponse::new(StatusCode::NOT_FOUND), + }; + + return Ok(response); + } ".well-known" => match (path.next().unwrap_or_default(), req.method()) { ("jmap", &Method::GET) => { // Authenticate request @@ -208,6 +250,18 @@ impl ParseHttp for Server { .await .map(|s| s.into_http_response()); } + ("caldav", &Method::GET) => { + let base_url = ctx.resolve_response_url(self).await; + return Ok(HttpResponse::new(StatusCode::TEMPORARY_REDIRECT) + .with_no_cache() + .with_location(format!("{base_url}/dav/cal"))); + } + ("carddav", &Method::GET) => { + let base_url = ctx.resolve_response_url(self).await; + return Ok(HttpResponse::new(StatusCode::TEMPORARY_REDIRECT) + .with_no_cache() + .with_location(format!("{base_url}/dav/card"))); + } ("oauth-authorization-server", &Method::GET) => { // Limit anonymous requests self.is_http_anonymous_request_allowed(&session.remote_ip) @@ -690,13 +744,13 @@ async fn handle_session(inner: Arc, session: SessionDat trc::event!( Http(trc::HttpEvent::ResponseBody), SpanId = session.session_id, - Contents = match &response.body { + Contents = match response.body() { HttpResponseBody::Text(value) => trc::Value::String(value.clone()), HttpResponseBody::Binary(_) => trc::Value::Static("[binary data]"), HttpResponseBody::Stream(_) => trc::Value::Static("[stream]"), _ => trc::Value::None, }, - Code = response.status.as_u16(), + Code = response.status().as_u16(), Size = response.size(), ); diff --git a/crates/jmap/src/api/event_source.rs b/crates/jmap/src/api/event_source.rs index 8cc0aa70..d26621f7 100644 --- a/crates/jmap/src/api/event_source.rs +++ b/crates/jmap/src/api/event_source.rs @@ -105,12 +105,10 @@ impl EventSourceHandler for Server { .subscribe_state_manager(access_token.primary_id(), types) .await?; - Ok(HttpResponse { - status: StatusCode::OK, - content_type: "text/event-stream".into(), - content_disposition: "".into(), - cache_control: "no-store".into(), - body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(async_stream::stream! { + Ok(HttpResponse::new(StatusCode::OK) + .with_content_type("text/event-stream") + .with_cache_control("no-store") + .with_stream_body(BoxBody::new(StreamBody::new(async_stream::stream! { let mut last_message = Instant::now() - throttle; let mut timeout = ping.as_ref().map(|p| p.interval).unwrap_or(LONG_1D_SLUMBER); @@ -162,7 +160,6 @@ impl EventSourceHandler for Server { LONG_1D_SLUMBER }; } - }))), - }) + })))) } } diff --git a/crates/jmap/src/api/mod.rs b/crates/jmap/src/api/mod.rs index 9f4e9df6..49a89706 100644 --- a/crates/jmap/src/api/mod.rs +++ b/crates/jmap/src/api/mod.rs @@ -42,11 +42,9 @@ impl ToJmapHttpResponse for Session { impl ToJmapHttpResponse for RequestError<'_> { fn into_http_response(self) -> HttpResponse { - HttpResponse::new_text( - StatusCode::from_u16(self.status).unwrap_or(StatusCode::BAD_REQUEST), - "application/problem+json", - serde_json::to_string(&self).unwrap_or_default(), - ) + HttpResponse::new(StatusCode::from_u16(self.status).unwrap_or(StatusCode::BAD_REQUEST)) + .with_content_type("application/problem+json") + .with_text_body(serde_json::to_string(&self).unwrap_or_default()) } } diff --git a/crates/jmap/src/api/request.rs b/crates/jmap/src/api/request.rs index 015a60d2..9fdd3928 100644 --- a/crates/jmap/src/api/request.rs +++ b/crates/jmap/src/api/request.rs @@ -43,7 +43,7 @@ use crate::{ use std::future::Future; pub trait RequestHandler: Sync + Send { - fn handle_request( + fn handle_jmap_request( &self, request: Request, access_token: Arc, @@ -61,7 +61,7 @@ pub trait RequestHandler: Sync + Send { } impl RequestHandler for Server { - async fn handle_request( + async fn handle_jmap_request( &self, request: Request, access_token: Arc, diff --git a/crates/jmap/src/websocket/stream.rs b/crates/jmap/src/websocket/stream.rs index 96c871ad..59afdf32 100644 --- a/crates/jmap/src/websocket/stream.rs +++ b/crates/jmap/src/websocket/stream.rs @@ -97,7 +97,7 @@ impl WebSocketHandler for Server { ) { Ok(WebSocketMessage::Request(request)) => { let response = self - .handle_request( + .handle_jmap_request( request.request, access_token.clone(), &session, diff --git a/crates/jmap/src/websocket/upgrade.rs b/crates/jmap/src/websocket/upgrade.rs index 9fdfa5cb..9a53eb06 100644 --- a/crates/jmap/src/websocket/upgrade.rs +++ b/crates/jmap/src/websocket/upgrade.rs @@ -102,12 +102,6 @@ impl WebSocketUpgrade for Server { } }); - Ok(HttpResponse { - status: StatusCode::SWITCHING_PROTOCOLS, - content_type: "".into(), - content_disposition: "".into(), - cache_control: "".into(), - body: HttpResponseBody::WebsocketUpgrade(derived_key), - }) + Ok(HttpResponse::new(StatusCode::SWITCHING_PROTOCOLS).with_websocket_upgrade(derived_key)) } }