mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-10-28 21:36:08 +08:00
DAV skeleton
This commit is contained in:
parent
b0a486106e
commit
1c460c7f3b
15 changed files with 515 additions and 298 deletions
20
Cargo.lock
generated
20
Cargo.lock
generated
|
|
@ -1669,6 +1669,25 @@ checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
|
|||
[[package]]
|
||||
name = "dav"
|
||||
version = "0.11.5"
|
||||
dependencies = [
|
||||
"common",
|
||||
"dav-proto",
|
||||
"groupware",
|
||||
"hashify",
|
||||
"http_proto",
|
||||
"hyper 1.6.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dav-proto"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"calcard",
|
||||
"hashify",
|
||||
"hyper 1.6.0",
|
||||
"mail-parser",
|
||||
"quick-xml 0.37.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dbl"
|
||||
|
|
@ -2988,6 +3007,7 @@ dependencies = [
|
|||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"common",
|
||||
"dav",
|
||||
"directory",
|
||||
"email",
|
||||
"form-data",
|
||||
|
|
|
|||
|
|
@ -5,6 +5,12 @@ edition = "2024"
|
|||
resolver = "2"
|
||||
|
||||
[dependencies]
|
||||
dav-proto = { path = "/Users/me/code/dav-proto" }
|
||||
common = { path = "../common" }
|
||||
groupware = { path = "../groupware" }
|
||||
http_proto = { path = "../http-proto" }
|
||||
hashify = { version = "0.2" }
|
||||
hyper = { version = "1.0.1", features = ["server", "http1", "http2"] }
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
|
|
|
|||
|
|
@ -3,3 +3,90 @@
|
|||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
pub mod request;
|
||||
|
||||
use http_proto::HttpResponse;
|
||||
use hyper::{Method, StatusCode};
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum DavResource {
|
||||
Card,
|
||||
Cal,
|
||||
File,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum DavMethod {
|
||||
GET,
|
||||
PUT,
|
||||
POST,
|
||||
DELETE,
|
||||
PATCH,
|
||||
PROPFIND,
|
||||
PROPPATCH,
|
||||
REPORT,
|
||||
MKCOL,
|
||||
COPY,
|
||||
MOVE,
|
||||
LOCK,
|
||||
UNLOCK,
|
||||
OPTIONS,
|
||||
}
|
||||
|
||||
impl DavResource {
|
||||
pub fn parse(service: &str) -> Option<Self> {
|
||||
hashify::tiny_map!(service.as_bytes(),
|
||||
"card" => DavResource::Card,
|
||||
"cal" => DavResource::Cal,
|
||||
"file" => DavResource::File
|
||||
)
|
||||
}
|
||||
|
||||
pub fn into_options_response(self) -> HttpResponse {
|
||||
let todo = "true";
|
||||
HttpResponse::new(StatusCode::OK)
|
||||
.with_header("DAV", "1, 2, 3, access-control, calendar-access")
|
||||
}
|
||||
}
|
||||
|
||||
impl DavMethod {
|
||||
pub fn parse(method: &Method) -> Option<Self> {
|
||||
match *method {
|
||||
Method::GET => Some(DavMethod::GET),
|
||||
Method::PUT => Some(DavMethod::PUT),
|
||||
Method::DELETE => Some(DavMethod::DELETE),
|
||||
Method::OPTIONS => Some(DavMethod::OPTIONS),
|
||||
Method::POST => Some(DavMethod::POST),
|
||||
Method::PATCH => Some(DavMethod::PATCH),
|
||||
_ => {
|
||||
hashify::tiny_map!(method.as_str().as_bytes(),
|
||||
"PROPFIND" => DavMethod::PROPFIND,
|
||||
"PROPPATCH" => DavMethod::PROPPATCH,
|
||||
"REPORT" => DavMethod::REPORT,
|
||||
"MKCOL" => DavMethod::MKCOL,
|
||||
"COPY" => DavMethod::COPY,
|
||||
"MOVE" => DavMethod::MOVE,
|
||||
"LOCK" => DavMethod::LOCK,
|
||||
"UNLOCK" => DavMethod::UNLOCK
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn has_body(self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
DavMethod::PUT
|
||||
| DavMethod::POST
|
||||
| DavMethod::PATCH
|
||||
| DavMethod::PROPPATCH
|
||||
| DavMethod::PROPFIND
|
||||
| DavMethod::REPORT
|
||||
| DavMethod::MKCOL
|
||||
| DavMethod::COPY
|
||||
| DavMethod::MOVE
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
38
crates/dav/src/request.rs
Normal file
38
crates/dav/src/request.rs
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{Server, auth::AccessToken};
|
||||
use http_proto::{HttpRequest, HttpResponse, HttpSessionData};
|
||||
|
||||
use crate::{DavMethod, DavResource};
|
||||
|
||||
pub trait DavRequestHandler: Sync + Send {
|
||||
fn handle_dav_request(
|
||||
&self,
|
||||
request: HttpRequest,
|
||||
access_token: Arc<AccessToken>,
|
||||
session: &HttpSessionData,
|
||||
resource: DavResource,
|
||||
method: DavMethod,
|
||||
body: Vec<u8>,
|
||||
) -> impl Future<Output = HttpResponse> + Send;
|
||||
}
|
||||
|
||||
impl DavRequestHandler for Server {
|
||||
async fn handle_dav_request(
|
||||
&self,
|
||||
request: HttpRequest,
|
||||
access_token: Arc<AccessToken>,
|
||||
session: &HttpSessionData,
|
||||
resource: DavResource,
|
||||
method: DavMethod,
|
||||
body: Vec<u8>,
|
||||
) -> HttpResponse {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
|
@ -10,7 +10,7 @@ pub mod response;
|
|||
|
||||
pub use form_urlencoded;
|
||||
|
||||
use std::{borrow::Cow, net::IpAddr, sync::Arc};
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
|
||||
use common::listener::ServerInstance;
|
||||
use hyper::StatusCode;
|
||||
|
|
@ -37,11 +37,9 @@ pub enum HttpResponseBody {
|
|||
}
|
||||
|
||||
pub struct HttpResponse {
|
||||
pub status: StatusCode,
|
||||
pub content_type: Cow<'static, str>,
|
||||
pub content_disposition: Cow<'static, str>,
|
||||
pub cache_control: Cow<'static, str>,
|
||||
pub body: HttpResponseBody,
|
||||
status: StatusCode,
|
||||
builder: hyper::http::response::Builder,
|
||||
body: HttpResponseBody,
|
||||
}
|
||||
|
||||
pub struct HttpContext<'x> {
|
||||
|
|
|
|||
|
|
@ -4,11 +4,13 @@
|
|||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
use common::manager::webadmin::Resource;
|
||||
use http_body_util::{BodyExt, Full};
|
||||
use hyper::{StatusCode, body::Bytes, header};
|
||||
use hyper::{
|
||||
StatusCode,
|
||||
body::Bytes,
|
||||
header::{self, HeaderName, HeaderValue},
|
||||
};
|
||||
use serde_json::json;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -17,42 +19,91 @@ use crate::{
|
|||
};
|
||||
|
||||
impl HttpResponse {
|
||||
pub fn new_empty(status: StatusCode) -> Self {
|
||||
pub fn new(status: StatusCode) -> Self {
|
||||
HttpResponse {
|
||||
status,
|
||||
content_type: "".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "".into(),
|
||||
builder: hyper::Response::builder().status(status),
|
||||
body: HttpResponseBody::Empty,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_text(
|
||||
status: StatusCode,
|
||||
content_type: impl Into<Cow<'static, str>>,
|
||||
body: impl Into<String>,
|
||||
) -> Self {
|
||||
HttpResponse {
|
||||
status,
|
||||
content_type: content_type.into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "".into(),
|
||||
body: HttpResponseBody::Text(body.into()),
|
||||
}
|
||||
pub fn with_content_type<V>(mut self, content_type: V) -> Self
|
||||
where
|
||||
V: TryInto<HeaderValue>,
|
||||
<V as TryInto<HeaderValue>>::Error: Into<hyper::http::Error>,
|
||||
{
|
||||
self.builder = self.builder.header(header::CONTENT_TYPE, content_type);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn new_binary(
|
||||
status: StatusCode,
|
||||
content_type: impl Into<Cow<'static, str>>,
|
||||
body: impl Into<Vec<u8>>,
|
||||
pub fn with_header<K, V>(mut self, name: K, value: V) -> Self
|
||||
where
|
||||
K: TryInto<HeaderName>,
|
||||
<K as TryInto<HeaderName>>::Error: Into<hyper::http::Error>,
|
||||
V: TryInto<HeaderValue>,
|
||||
<V as TryInto<HeaderValue>>::Error: Into<hyper::http::Error>,
|
||||
{
|
||||
self.builder = self.builder.header(name, value);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_text_body(mut self, body: impl Into<String>) -> Self {
|
||||
self.body = HttpResponseBody::Text(body.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_binary_body(mut self, body: impl Into<Vec<u8>>) -> Self {
|
||||
self.body = HttpResponseBody::Binary(body.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_stream_body(
|
||||
mut self,
|
||||
stream: http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>,
|
||||
) -> Self {
|
||||
HttpResponse {
|
||||
status,
|
||||
content_type: content_type.into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "".into(),
|
||||
body: HttpResponseBody::Binary(body.into()),
|
||||
}
|
||||
self.body = HttpResponseBody::Stream(stream);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_websocket_upgrade(mut self, derived_key: String) -> Self {
|
||||
self.body = HttpResponseBody::WebsocketUpgrade(derived_key);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_content_disposition<V>(mut self, content_disposition: V) -> Self
|
||||
where
|
||||
V: TryInto<HeaderValue>,
|
||||
<V as TryInto<HeaderValue>>::Error: Into<hyper::http::Error>,
|
||||
{
|
||||
self.builder = self
|
||||
.builder
|
||||
.header(header::CONTENT_DISPOSITION, content_disposition);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_cache_control<V>(mut self, cache_control: V) -> Self
|
||||
where
|
||||
V: TryInto<HeaderValue>,
|
||||
<V as TryInto<HeaderValue>>::Error: Into<hyper::http::Error>,
|
||||
{
|
||||
self.builder = self.builder.header(header::CACHE_CONTROL, cache_control);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_no_cache(mut self) -> Self {
|
||||
self.builder = self
|
||||
.builder
|
||||
.header(header::CACHE_CONTROL, "no-store, no-cache, must-revalidate");
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_location<V>(mut self, location: V) -> Self
|
||||
where
|
||||
V: TryInto<HeaderValue>,
|
||||
<V as TryInto<HeaderValue>>::Error: Into<hyper::http::Error>,
|
||||
{
|
||||
self.builder = self.builder.header(header::LOCATION, location);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn size(&self) -> usize {
|
||||
|
|
@ -67,46 +118,25 @@ impl HttpResponse {
|
|||
self,
|
||||
) -> hyper::Response<http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>>
|
||||
{
|
||||
let builder = hyper::Response::builder().status(self.status);
|
||||
|
||||
match self.body {
|
||||
HttpResponseBody::Text(body) => builder
|
||||
.header(header::CONTENT_TYPE, self.content_type.as_ref())
|
||||
.body(
|
||||
Full::new(Bytes::from(body))
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
),
|
||||
HttpResponseBody::Binary(body) => {
|
||||
let mut builder = builder.header(header::CONTENT_TYPE, self.content_type.as_ref());
|
||||
|
||||
if !self.content_disposition.is_empty() {
|
||||
builder = builder.header(
|
||||
header::CONTENT_DISPOSITION,
|
||||
self.content_disposition.as_ref(),
|
||||
);
|
||||
}
|
||||
|
||||
if !self.cache_control.is_empty() {
|
||||
builder = builder.header(header::CACHE_CONTROL, self.cache_control.as_ref());
|
||||
}
|
||||
|
||||
builder.body(
|
||||
Full::new(Bytes::from(body))
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
)
|
||||
}
|
||||
HttpResponseBody::Empty => builder.body(
|
||||
HttpResponseBody::Text(body) => self.builder.body(
|
||||
Full::new(Bytes::from(body))
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
),
|
||||
HttpResponseBody::Binary(body) => self.builder.body(
|
||||
Full::new(Bytes::from(body))
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
),
|
||||
HttpResponseBody::Empty => self.builder.body(
|
||||
Full::new(Bytes::new())
|
||||
.map_err(|never| match never {})
|
||||
.boxed(),
|
||||
),
|
||||
HttpResponseBody::Stream(stream) => builder
|
||||
.header(header::CONTENT_TYPE, self.content_type.as_ref())
|
||||
.header(header::CACHE_CONTROL, self.cache_control.as_ref())
|
||||
.body(stream),
|
||||
HttpResponseBody::WebsocketUpgrade(derived_key) => builder
|
||||
HttpResponseBody::Stream(stream) => self.builder.body(stream),
|
||||
HttpResponseBody::WebsocketUpgrade(derived_key) => self
|
||||
.builder
|
||||
.header(header::CONNECTION, "upgrade")
|
||||
.header(header::UPGRADE, "websocket")
|
||||
.header("Sec-WebSocket-Accept", &derived_key)
|
||||
|
|
@ -119,67 +149,73 @@ impl HttpResponse {
|
|||
}
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn body(&self) -> &HttpResponseBody {
|
||||
&self.body
|
||||
}
|
||||
|
||||
pub fn status(&self) -> StatusCode {
|
||||
self.status
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: serde::Serialize> ToHttpResponse for JsonResponse<T> {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse {
|
||||
status: self.status,
|
||||
content_type: "application/json; charset=utf-8".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: if !self.no_cache {
|
||||
""
|
||||
} else {
|
||||
"no-store, no-cache, must-revalidate"
|
||||
}
|
||||
.into(),
|
||||
body: HttpResponseBody::Text(serde_json::to_string(&self.inner).unwrap_or_default()),
|
||||
let response = HttpResponse::new(self.status)
|
||||
.with_content_type("application/json; charset=utf-8")
|
||||
.with_text_body(serde_json::to_string(&self.inner).unwrap_or_default());
|
||||
|
||||
if self.no_cache {
|
||||
response.with_no_cache()
|
||||
} else {
|
||||
response
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToHttpResponse for DownloadResponse {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: self.content_type.into(),
|
||||
content_disposition: format!(
|
||||
HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type(self.content_type)
|
||||
.with_content_disposition(format!(
|
||||
"attachment; filename=\"{}\"",
|
||||
self.filename.replace('\"', "\\\"")
|
||||
)
|
||||
.into(),
|
||||
cache_control: "private, immutable, max-age=31536000".into(),
|
||||
body: HttpResponseBody::Binary(self.blob),
|
||||
}
|
||||
))
|
||||
.with_cache_control("private, immutable, max-age=31536000")
|
||||
.with_binary_body(self.blob)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToHttpResponse for Resource<Vec<u8>> {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse::new_binary(StatusCode::OK, self.content_type, self.contents)
|
||||
HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type(self.content_type.as_ref())
|
||||
.with_binary_body(self.contents)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToHttpResponse for HtmlResponse {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse::new_text(self.status, "text/html; charset=utf-8", self.body)
|
||||
HttpResponse::new(self.status)
|
||||
.with_content_type("text/html; charset=utf-8")
|
||||
.with_text_body(self.body)
|
||||
}
|
||||
}
|
||||
|
||||
impl ToHttpResponse for JsonProblemResponse {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse::new_text(
|
||||
self.0,
|
||||
"application/problem+json",
|
||||
serde_json::to_string(&json!(
|
||||
{
|
||||
"type": "about:blank",
|
||||
"title": self.0.canonical_reason().unwrap_or_default(),
|
||||
"status": self.0.as_u16(),
|
||||
"detail": self.0.canonical_reason().unwrap_or_default(),
|
||||
}
|
||||
))
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
HttpResponse::new(self.0)
|
||||
.with_content_type("application/problem+json")
|
||||
.with_text_body(
|
||||
serde_json::to_string(&json!(
|
||||
{
|
||||
"type": "about:blank",
|
||||
"title": self.0.canonical_reason().unwrap_or_default(),
|
||||
"status": self.0.as_u16(),
|
||||
"detail": self.0.canonical_reason().unwrap_or_default(),
|
||||
}
|
||||
))
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ trc = { path = "../trc" }
|
|||
email = { path = "../email" }
|
||||
smtp = { path = "../smtp" }
|
||||
jmap = { path = "../jmap" }
|
||||
dav = { path = "../dav" }
|
||||
spam-filter = { path = "../spam-filter" }
|
||||
http_proto = { path = "../http-proto" }
|
||||
jmap_proto = { path = "../jmap-proto" }
|
||||
|
|
|
|||
|
|
@ -203,107 +203,104 @@ impl TelemetryApi for Server {
|
|||
let mut events = Vec::new();
|
||||
let mut active_span_ids = AHashSet::new();
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: "text/event-stream".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "no-store".into(),
|
||||
body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(
|
||||
async_stream::stream! {
|
||||
let mut last_message = Instant::now() - throttle;
|
||||
let mut timeout = ping_interval;
|
||||
Ok(HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type("text/event-stream")
|
||||
.with_cache_control("no-store")
|
||||
.with_stream_body(BoxBody::new(StreamBody::new(
|
||||
async_stream::stream! {
|
||||
let mut last_message = Instant::now() - throttle;
|
||||
let mut timeout = ping_interval;
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(timeout, rx.recv()).await {
|
||||
Ok(Some(event_batch)) => {
|
||||
for event in event_batch {
|
||||
if (filter.is_none() && key_filters.is_empty())
|
||||
|| event
|
||||
.span_id()
|
||||
.is_some_and(|span_id| active_span_ids.contains(&span_id))
|
||||
loop {
|
||||
match tokio::time::timeout(timeout, rx.recv()).await {
|
||||
Ok(Some(event_batch)) => {
|
||||
for event in event_batch {
|
||||
if (filter.is_none() && key_filters.is_empty())
|
||||
|| event
|
||||
.span_id()
|
||||
.is_some_and(|span_id| active_span_ids.contains(&span_id))
|
||||
{
|
||||
events.push(event);
|
||||
} else {
|
||||
let mut matched_keys = AHashSet::new();
|
||||
for (key, value) in event
|
||||
.keys
|
||||
.iter()
|
||||
.chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter()))
|
||||
{
|
||||
events.push(event);
|
||||
} else {
|
||||
let mut matched_keys = AHashSet::new();
|
||||
for (key, value) in event
|
||||
.keys
|
||||
.iter()
|
||||
.chain(event.inner.span.as_ref().map_or(([]).iter(), |s| s.keys.iter()))
|
||||
{
|
||||
if let Some(needle) = key_filters.get(key).or(filter.as_ref()) {
|
||||
let matches = match value {
|
||||
Value::Static(haystack) => haystack.contains(needle),
|
||||
Value::String(haystack) => haystack.contains(needle),
|
||||
Value::Timestamp(haystack) => {
|
||||
DateTime::from_timestamp(*haystack as i64)
|
||||
.to_rfc3339()
|
||||
.contains(needle)
|
||||
}
|
||||
Value::Bool(true) => needle == "true",
|
||||
Value::Bool(false) => needle == "false",
|
||||
Value::Ipv4(haystack) => haystack.to_string().contains(needle),
|
||||
Value::Ipv6(haystack) => haystack.to_string().contains(needle),
|
||||
Value::Event(_) |
|
||||
Value::Array(_) |
|
||||
Value::UInt(_) |
|
||||
Value::Int(_) |
|
||||
Value::Float(_) |
|
||||
Value::Duration(_) |
|
||||
Value::Bytes(_) |
|
||||
Value::None => false,
|
||||
};
|
||||
if let Some(needle) = key_filters.get(key).or(filter.as_ref()) {
|
||||
let matches = match value {
|
||||
Value::Static(haystack) => haystack.contains(needle),
|
||||
Value::String(haystack) => haystack.contains(needle),
|
||||
Value::Timestamp(haystack) => {
|
||||
DateTime::from_timestamp(*haystack as i64)
|
||||
.to_rfc3339()
|
||||
.contains(needle)
|
||||
}
|
||||
Value::Bool(true) => needle == "true",
|
||||
Value::Bool(false) => needle == "false",
|
||||
Value::Ipv4(haystack) => haystack.to_string().contains(needle),
|
||||
Value::Ipv6(haystack) => haystack.to_string().contains(needle),
|
||||
Value::Event(_) |
|
||||
Value::Array(_) |
|
||||
Value::UInt(_) |
|
||||
Value::Int(_) |
|
||||
Value::Float(_) |
|
||||
Value::Duration(_) |
|
||||
Value::Bytes(_) |
|
||||
Value::None => false,
|
||||
};
|
||||
|
||||
if matches {
|
||||
matched_keys.insert(*key);
|
||||
if filter.is_some() || matched_keys.len() == key_filters.len() {
|
||||
if let Some(span_id) = event.span_id() {
|
||||
active_span_ids.insert(span_id);
|
||||
}
|
||||
events.push(event);
|
||||
break;
|
||||
if matches {
|
||||
matched_keys.insert(*key);
|
||||
if filter.is_some() || matched_keys.len() == key_filters.len() {
|
||||
if let Some(span_id) = event.span_id() {
|
||||
active_span_ids.insert(span_id);
|
||||
}
|
||||
events.push(event);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
Err(_) => (),
|
||||
}
|
||||
|
||||
timeout = if !events.is_empty() {
|
||||
let elapsed = last_message.elapsed();
|
||||
if elapsed >= throttle {
|
||||
last_message = Instant::now();
|
||||
yield Ok(Frame::data(Bytes::from(format!(
|
||||
"event: trace\ndata: {}\n\n",
|
||||
serde_json::to_string(
|
||||
&JsonEventSerializer::new(std::mem::take(&mut events))
|
||||
.with_description()
|
||||
.with_explanation()).unwrap_or_default()
|
||||
))));
|
||||
|
||||
ping_interval
|
||||
} else {
|
||||
throttle - elapsed
|
||||
}
|
||||
} else {
|
||||
let elapsed = last_ping.elapsed();
|
||||
if elapsed >= ping_interval {
|
||||
last_ping = Instant::now();
|
||||
yield Ok(Frame::data(ping_payload.clone()));
|
||||
ping_interval
|
||||
} else {
|
||||
ping_interval - elapsed
|
||||
}
|
||||
};
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
Err(_) => (),
|
||||
}
|
||||
},
|
||||
))),
|
||||
})
|
||||
|
||||
timeout = if !events.is_empty() {
|
||||
let elapsed = last_message.elapsed();
|
||||
if elapsed >= throttle {
|
||||
last_message = Instant::now();
|
||||
yield Ok(Frame::data(Bytes::from(format!(
|
||||
"event: trace\ndata: {}\n\n",
|
||||
serde_json::to_string(
|
||||
&JsonEventSerializer::new(std::mem::take(&mut events))
|
||||
.with_description()
|
||||
.with_explanation()).unwrap_or_default()
|
||||
))));
|
||||
|
||||
ping_interval
|
||||
} else {
|
||||
throttle - elapsed
|
||||
}
|
||||
} else {
|
||||
let elapsed = last_ping.elapsed();
|
||||
if elapsed >= ping_interval {
|
||||
last_ping = Instant::now();
|
||||
yield Ok(Frame::data(ping_payload.clone()));
|
||||
ping_interval
|
||||
} else {
|
||||
ping_interval - elapsed
|
||||
}
|
||||
};
|
||||
}
|
||||
},
|
||||
))))
|
||||
}
|
||||
("trace", id, &Method::GET) => {
|
||||
// Validate the access token
|
||||
|
|
@ -474,73 +471,69 @@ impl TelemetryApi for Server {
|
|||
}
|
||||
}
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: "text/event-stream".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "no-store".into(),
|
||||
body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(
|
||||
async_stream::stream! {
|
||||
Ok(HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type("text/event-stream")
|
||||
.with_cache_control("no-store")
|
||||
.with_stream_body(BoxBody::new(StreamBody::new(
|
||||
async_stream::stream! {
|
||||
loop {
|
||||
let mut metrics = String::with_capacity(512);
|
||||
metrics.push_str("event: metrics\ndata: [");
|
||||
let mut is_first = true;
|
||||
|
||||
loop {
|
||||
let mut metrics = String::with_capacity(512);
|
||||
metrics.push_str("event: metrics\ndata: [");
|
||||
let mut is_first = true;
|
||||
|
||||
for counter in Collector::collect_counters(true) {
|
||||
if event_types.is_empty() || event_types.contains(&counter.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}",
|
||||
counter.id().name(),
|
||||
counter.value()
|
||||
);
|
||||
for counter in Collector::collect_counters(true) {
|
||||
if event_types.is_empty() || event_types.contains(&counter.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"counter\",\"value\":{}}}",
|
||||
counter.id().name(),
|
||||
counter.value()
|
||||
);
|
||||
}
|
||||
for gauge in Collector::collect_gauges(true) {
|
||||
if metric_types.is_empty() || metric_types.contains(&gauge.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}",
|
||||
gauge.id().name(),
|
||||
gauge.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
for histogram in Collector::collect_histograms(true) {
|
||||
if metric_types.is_empty() || metric_types.contains(&histogram.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}",
|
||||
histogram.id().name(),
|
||||
histogram.count(),
|
||||
histogram.sum()
|
||||
);
|
||||
}
|
||||
}
|
||||
metrics.push_str("]\n\n");
|
||||
|
||||
yield Ok(Frame::data(Bytes::from(metrics)));
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
},
|
||||
))),
|
||||
})
|
||||
for gauge in Collector::collect_gauges(true) {
|
||||
if metric_types.is_empty() || metric_types.contains(&gauge.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"gauge\",\"value\":{}}}",
|
||||
gauge.id().name(),
|
||||
gauge.get()
|
||||
);
|
||||
}
|
||||
}
|
||||
for histogram in Collector::collect_histograms(true) {
|
||||
if metric_types.is_empty() || metric_types.contains(&histogram.id()) {
|
||||
if !is_first {
|
||||
metrics.push(',');
|
||||
} else {
|
||||
is_first = false;
|
||||
}
|
||||
let _ = write!(
|
||||
&mut metrics,
|
||||
"{{\"id\":\"{}\",\"type\":\"histogram\",\"count\":{},\"sum\":{}}}",
|
||||
histogram.id().name(),
|
||||
histogram.count(),
|
||||
histogram.sum()
|
||||
);
|
||||
}
|
||||
}
|
||||
metrics.push_str("]\n\n");
|
||||
|
||||
yield Ok(Frame::data(Bytes::from(metrics)));
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
},
|
||||
))))
|
||||
}
|
||||
_ => Err(trc::ResourceEvent::NotFound.into_err()),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,20 +89,15 @@ impl TroubleshootApi for Server {
|
|||
timeout,
|
||||
);
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: "text/event-stream".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "no-store".into(),
|
||||
body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(
|
||||
async_stream::stream! {
|
||||
while let Some(stage) = rx.recv().await {
|
||||
yield Ok(stage.to_frame());
|
||||
}
|
||||
yield Ok(DeliveryStage::Completed.to_frame());
|
||||
},
|
||||
))),
|
||||
})
|
||||
Ok(HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type("text/event-stream")
|
||||
.with_cache_control("no-store")
|
||||
.with_stream_body(BoxBody::new(StreamBody::new(async_stream::stream! {
|
||||
while let Some(stage) = rx.recv().await {
|
||||
yield Ok(stage.to_frame());
|
||||
}
|
||||
yield Ok(DeliveryStage::Completed.to_frame());
|
||||
}))))
|
||||
}
|
||||
("dmarc", None, &Method::POST) => {
|
||||
let request = serde_json::from_slice::<DmarcTroubleshootRequest>(
|
||||
|
|
|
|||
|
|
@ -1,3 +1,9 @@
|
|||
/*
|
||||
* SPDX-FileCopyrightText: 2020 Stalwart Labs Ltd <hello@stalw.art>
|
||||
*
|
||||
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
|
||||
*/
|
||||
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
|
||||
use common::{
|
||||
|
|
@ -8,6 +14,7 @@ use common::{
|
|||
listener::{SessionData, SessionManager, SessionStream},
|
||||
manager::webadmin::Resource,
|
||||
};
|
||||
use dav::{DavMethod, DavResource, request::DavRequestHandler};
|
||||
use directory::Permission;
|
||||
use http_proto::{
|
||||
DownloadResponse, HttpContext, HttpRequest, HttpResponse, HttpResponseBody, HttpSessionData,
|
||||
|
|
@ -107,7 +114,7 @@ impl ParseHttp for Server {
|
|||
})?;
|
||||
|
||||
return Ok(self
|
||||
.handle_request(request, access_token, &session)
|
||||
.handle_jmap_request(request, access_token, &session)
|
||||
.await
|
||||
.into_http_response());
|
||||
}
|
||||
|
|
@ -197,6 +204,41 @@ impl ParseHttp for Server {
|
|||
_ => (),
|
||||
}
|
||||
}
|
||||
"dav" => {
|
||||
let response = match (
|
||||
path.next().and_then(DavResource::parse),
|
||||
DavMethod::parse(req.method()),
|
||||
) {
|
||||
(Some(resource), Some(DavMethod::OPTIONS)) => resource.into_options_response(),
|
||||
(Some(resource), Some(method)) => {
|
||||
// Authenticate request
|
||||
let (_in_flight, access_token) =
|
||||
self.authenticate_headers(&req, &session, false).await?;
|
||||
let body = if method.has_body() {
|
||||
fetch_body(
|
||||
&mut req,
|
||||
if !access_token.has_permission(Permission::UnlimitedUploads) {
|
||||
self.core.jmap.upload_max_size
|
||||
} else {
|
||||
0
|
||||
},
|
||||
session.session_id,
|
||||
)
|
||||
.await
|
||||
.ok_or_else(|| trc::LimitEvent::SizeRequest.into_err())?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
self.handle_dav_request(req, access_token, &session, resource, method, body)
|
||||
.await
|
||||
}
|
||||
(_, None) => HttpResponse::new(StatusCode::METHOD_NOT_ALLOWED),
|
||||
(None, _) => HttpResponse::new(StatusCode::NOT_FOUND),
|
||||
};
|
||||
|
||||
return Ok(response);
|
||||
}
|
||||
".well-known" => match (path.next().unwrap_or_default(), req.method()) {
|
||||
("jmap", &Method::GET) => {
|
||||
// Authenticate request
|
||||
|
|
@ -208,6 +250,18 @@ impl ParseHttp for Server {
|
|||
.await
|
||||
.map(|s| s.into_http_response());
|
||||
}
|
||||
("caldav", &Method::GET) => {
|
||||
let base_url = ctx.resolve_response_url(self).await;
|
||||
return Ok(HttpResponse::new(StatusCode::TEMPORARY_REDIRECT)
|
||||
.with_no_cache()
|
||||
.with_location(format!("{base_url}/dav/cal")));
|
||||
}
|
||||
("carddav", &Method::GET) => {
|
||||
let base_url = ctx.resolve_response_url(self).await;
|
||||
return Ok(HttpResponse::new(StatusCode::TEMPORARY_REDIRECT)
|
||||
.with_no_cache()
|
||||
.with_location(format!("{base_url}/dav/card")));
|
||||
}
|
||||
("oauth-authorization-server", &Method::GET) => {
|
||||
// Limit anonymous requests
|
||||
self.is_http_anonymous_request_allowed(&session.remote_ip)
|
||||
|
|
@ -690,13 +744,13 @@ async fn handle_session<T: SessionStream>(inner: Arc<Inner>, session: SessionDat
|
|||
trc::event!(
|
||||
Http(trc::HttpEvent::ResponseBody),
|
||||
SpanId = session.session_id,
|
||||
Contents = match &response.body {
|
||||
Contents = match response.body() {
|
||||
HttpResponseBody::Text(value) => trc::Value::String(value.clone()),
|
||||
HttpResponseBody::Binary(_) => trc::Value::Static("[binary data]"),
|
||||
HttpResponseBody::Stream(_) => trc::Value::Static("[stream]"),
|
||||
_ => trc::Value::None,
|
||||
},
|
||||
Code = response.status.as_u16(),
|
||||
Code = response.status().as_u16(),
|
||||
Size = response.size(),
|
||||
);
|
||||
|
||||
|
|
|
|||
|
|
@ -105,12 +105,10 @@ impl EventSourceHandler for Server {
|
|||
.subscribe_state_manager(access_token.primary_id(), types)
|
||||
.await?;
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::OK,
|
||||
content_type: "text/event-stream".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "no-store".into(),
|
||||
body: HttpResponseBody::Stream(BoxBody::new(StreamBody::new(async_stream::stream! {
|
||||
Ok(HttpResponse::new(StatusCode::OK)
|
||||
.with_content_type("text/event-stream")
|
||||
.with_cache_control("no-store")
|
||||
.with_stream_body(BoxBody::new(StreamBody::new(async_stream::stream! {
|
||||
let mut last_message = Instant::now() - throttle;
|
||||
let mut timeout =
|
||||
ping.as_ref().map(|p| p.interval).unwrap_or(LONG_1D_SLUMBER);
|
||||
|
|
@ -162,7 +160,6 @@ impl EventSourceHandler for Server {
|
|||
LONG_1D_SLUMBER
|
||||
};
|
||||
}
|
||||
}))),
|
||||
})
|
||||
}))))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,11 +42,9 @@ impl ToJmapHttpResponse for Session {
|
|||
|
||||
impl ToJmapHttpResponse for RequestError<'_> {
|
||||
fn into_http_response(self) -> HttpResponse {
|
||||
HttpResponse::new_text(
|
||||
StatusCode::from_u16(self.status).unwrap_or(StatusCode::BAD_REQUEST),
|
||||
"application/problem+json",
|
||||
serde_json::to_string(&self).unwrap_or_default(),
|
||||
)
|
||||
HttpResponse::new(StatusCode::from_u16(self.status).unwrap_or(StatusCode::BAD_REQUEST))
|
||||
.with_content_type("application/problem+json")
|
||||
.with_text_body(serde_json::to_string(&self).unwrap_or_default())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ use crate::{
|
|||
use std::future::Future;
|
||||
|
||||
pub trait RequestHandler: Sync + Send {
|
||||
fn handle_request(
|
||||
fn handle_jmap_request(
|
||||
&self,
|
||||
request: Request,
|
||||
access_token: Arc<AccessToken>,
|
||||
|
|
@ -61,7 +61,7 @@ pub trait RequestHandler: Sync + Send {
|
|||
}
|
||||
|
||||
impl RequestHandler for Server {
|
||||
async fn handle_request(
|
||||
async fn handle_jmap_request(
|
||||
&self,
|
||||
request: Request,
|
||||
access_token: Arc<AccessToken>,
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ impl WebSocketHandler for Server {
|
|||
) {
|
||||
Ok(WebSocketMessage::Request(request)) => {
|
||||
let response = self
|
||||
.handle_request(
|
||||
.handle_jmap_request(
|
||||
request.request,
|
||||
access_token.clone(),
|
||||
&session,
|
||||
|
|
|
|||
|
|
@ -102,12 +102,6 @@ impl WebSocketUpgrade for Server {
|
|||
}
|
||||
});
|
||||
|
||||
Ok(HttpResponse {
|
||||
status: StatusCode::SWITCHING_PROTOCOLS,
|
||||
content_type: "".into(),
|
||||
content_disposition: "".into(),
|
||||
cache_control: "".into(),
|
||||
body: HttpResponseBody::WebsocketUpgrade(derived_key),
|
||||
})
|
||||
Ok(HttpResponse::new(StatusCode::SWITCHING_PROTOCOLS).with_websocket_upgrade(derived_key))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue