Unversioned archiving

This commit is contained in:
mdecimus 2025-05-15 13:35:40 +02:00
parent 839b7189fa
commit 365c87af20
28 changed files with 360 additions and 219 deletions

View file

@ -801,12 +801,13 @@ impl MessageStoreCache {
if is_unique_hash {
hash
} else {
loop {
for _ in 0..u32::MAX {
hash = hash.wrapping_add(1);
if !threads_ids.contains(hash) {
return hash;
}
}
hash
}
}
}

View file

@ -1,7 +1,10 @@
// Adapted from rustls-acme (https://github.com/FlorianUekermann/rustls-acme), licensed under MIT/Apache-2.0.
use std::time::Duration;
use super::AcmeProvider;
use super::jose::{
Body, eab_sign, key_authorization, key_authorization_sha256, key_authorization_sha256_base64,
sign,
};
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use hyper::header::USER_AGENT;
@ -11,17 +14,12 @@ use reqwest::{Method, Response};
use ring::rand::SystemRandom;
use ring::signature::{ECDSA_P256_SHA256_FIXED_SIGNING, EcdsaKeyPair, EcdsaSigningAlgorithm};
use serde::Deserialize;
use store::write::Archiver;
use store::{SERIALIZE_CERT_V1, Serialize, SerializedVersion};
use std::time::Duration;
use store::Serialize;
use store::write::UnversionedArchiver;
use trc::AddContext;
use trc::event::conv::AssertSuccess;
use super::AcmeProvider;
use super::jose::{
Body, eab_sign, key_authorization, key_authorization_sha256, key_authorization_sha256_base64,
sign,
};
pub const LETS_ENCRYPT_STAGING_DIRECTORY: &str =
"https://acme-staging-v02.api.letsencrypt.org/directory";
pub const LETS_ENCRYPT_PRODUCTION_DIRECTORY: &str =
@ -190,7 +188,7 @@ impl Account {
.reason(err)
})?;
Archiver::new(SerializedCert {
UnversionedArchiver::new(SerializedCert {
certificate: cert.serialize_der().map_err(|err| {
trc::EventType::Acme(trc::AcmeEvent::Error)
.caused_by(trc::location!())
@ -210,12 +208,6 @@ pub struct SerializedCert {
pub private_key: Vec<u8>,
}
impl SerializedVersion for SerializedCert {
fn serialize_version() -> u8 {
SERIALIZE_CERT_V1
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Directory {

View file

@ -4,8 +4,11 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::sync::Arc;
use super::{
AcmeProvider, StaticResolver,
directory::{ACME_TLS_ALPN_NAME, SerializedCert},
};
use crate::{KV_ACME, Server};
use rustls::{
ServerConfig,
crypto::ring::sign::any_ecdsa_type,
@ -13,19 +16,13 @@ use rustls::{
sign::CertifiedKey,
};
use rustls_pki_types::{CertificateDer, PrivateKeyDer, PrivatePkcs8KeyDer};
use std::sync::Arc;
use store::{
dispatch::lookup::KeyValue,
write::{AlignedBytes, Archive},
write::{AlignedBytes, UnversionedArchive},
};
use trc::AcmeEvent;
use crate::{KV_ACME, Server};
use super::{
AcmeProvider, StaticResolver,
directory::{ACME_TLS_ALPN_NAME, SerializedCert},
};
impl Server {
pub(crate) fn set_cert(&self, provider: &AcmeProvider, cert: Arc<CertifiedKey>) {
// Add certificates
@ -51,7 +48,7 @@ impl Server {
pub(crate) async fn build_acme_certificate(&self, domain: &str) -> Option<Arc<CertifiedKey>> {
match self
.in_memory_store()
.key_get::<Archive<AlignedBytes>>(KeyValue::<()>::build_key(KV_ACME, domain))
.key_get::<UnversionedArchive<AlignedBytes>>(KeyValue::<()>::build_key(KV_ACME, domain))
.await
{
Ok(Some(cert_)) => match cert_.unarchive::<SerializedCert>() {

View file

@ -20,7 +20,7 @@ use std::collections::HashMap;
use store::dispatch::lookup::KeyValue;
use store::write::serialize::rkyv_deserialize;
use store::write::{AlignedBytes, Archive, Archiver, now};
use store::{SERIALIZE_LOCKDATA_V1, Serialize, SerializedVersion, U32_LEN};
use store::{SERIALIZE_DAV_LOCKS_V1, Serialize, SerializedVersion, U32_LEN};
use trc::AddContext;
use super::ETag;
@ -755,7 +755,7 @@ impl<'x> LockCaches<'x> {
impl SerializedVersion for LockData {
fn serialize_version() -> u8 {
SERIALIZE_LOCKDATA_V1
SERIALIZE_DAV_LOCKS_V1
}
}

View file

@ -62,6 +62,7 @@ pub enum PrincipalData {
ExternalMembers(Vec<String>),
Urls(Vec<String>),
PrincipalQuota(Vec<PrincipalQuota>),
Language(String),
}
#[derive(rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Clone, PartialEq, Eq)]

View file

@ -8,6 +8,7 @@ pub mod cache;
pub mod identity;
pub mod mailbox;
pub mod message;
pub mod migration;
pub mod push;
pub mod sieve;
pub mod submission;

View file

@ -21,7 +21,7 @@ use rkyv::{
vec::ArchivedVec,
};
use std::{borrow::Cow, collections::VecDeque};
use store::{SERIALIZE_MSGDATA_V1, SERIALIZE_MSGMETADATA_V1, SerializedVersion};
use store::{SERIALIZE_MESSAGE_DATA_V1, SERIALIZE_MESSAGE_METADATA_V1, SerializedVersion};
use utils::BlobHash;
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive, Debug)]
@ -47,13 +47,13 @@ impl IndexableAndSerializableObject for MessageData {}
impl SerializedVersion for MessageData {
fn serialize_version() -> u8 {
SERIALIZE_MSGDATA_V1
SERIALIZE_MESSAGE_DATA_V1
}
}
impl SerializedVersion for MessageMetadata {
fn serialize_version() -> u8 {
SERIALIZE_MSGMETADATA_V1
SERIALIZE_MESSAGE_METADATA_V1
}
}

View file

@ -0,0 +1,14 @@
use common::Server;
use jmap_proto::types::collection::Collection;
async fn migrate_email(server: &Server, account_id: u32) -> trc::Result<()> {
// Obtain email ids
let document_ids = server
.get_document_ids(account_id, Collection::Email)
.await?
.unwrap_or_default();
//TODO remove tombstones
Ok(())
}

View file

@ -0,0 +1 @@
pub mod email;

View file

@ -12,7 +12,7 @@ use calcard::icalendar::ICalendar;
use common::DavName;
use dav_proto::schema::request::DeadProperty;
use jmap_proto::types::{acl::Acl, value::AclGrant};
use store::{SERIALIZE_CALENDAR_V1, SERIALIZE_CALENDAREVENT_V1, SerializedVersion};
use store::{SERIALIZE_CALENDAR_EVENT_V1, SERIALIZE_CALENDAR_V1, SerializedVersion};
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
@ -188,7 +188,7 @@ impl SerializedVersion for Calendar {
impl SerializedVersion for CalendarEvent {
fn serialize_version() -> u8 {
SERIALIZE_CALENDAREVENT_V1
SERIALIZE_CALENDAR_EVENT_V1
}
}

View file

@ -11,7 +11,7 @@ use calcard::vcard::VCard;
use common::DavName;
use dav_proto::schema::request::DeadProperty;
use jmap_proto::types::{acl::Acl, value::AclGrant};
use store::{SERIALIZE_ADDRESSBOOK_V1, SERIALIZE_CALENDAREVENT_V1, SerializedVersion};
use store::{SERIALIZE_ADDRESS_BOOK_V1, SERIALIZE_CONTACT_V1, SerializedVersion};
#[derive(
rkyv::Archive, rkyv::Deserialize, rkyv::Serialize, Debug, Default, Clone, PartialEq, Eq,
@ -78,12 +78,12 @@ impl From<AddressBookRight> for Acl {
impl SerializedVersion for AddressBook {
fn serialize_version() -> u8 {
SERIALIZE_ADDRESSBOOK_V1
SERIALIZE_ADDRESS_BOOK_V1
}
}
impl SerializedVersion for ContactCard {
fn serialize_version() -> u8 {
SERIALIZE_CALENDAREVENT_V1
SERIALIZE_CONTACT_V1
}
}

View file

@ -9,7 +9,7 @@ pub mod storage;
use dav_proto::schema::request::DeadProperty;
use jmap_proto::types::value::AclGrant;
use store::{SERIALIZE_FILENODE_V1, SerializedVersion};
use store::{SERIALIZE_FILE_NODE_V1, SerializedVersion};
use utils::BlobHash;
#[derive(
@ -40,6 +40,6 @@ pub struct FileProperties {
impl SerializedVersion for FileNode {
fn serialize_version() -> u8 {
SERIALIZE_FILENODE_V1
SERIALIZE_FILE_NODE_V1
}
}

View file

@ -19,7 +19,7 @@ use std::future::Future;
use store::{
Serialize,
dispatch::lookup::KeyValue,
write::{Archive, Archiver},
write::{UnversionedArchive, UnversionedArchiver},
};
use store::{
rand::{
@ -109,7 +109,7 @@ impl OAuthApiHandler for Server {
.collect::<String>();
// Serialize OAuth code
let value = Archiver::new(OAuthCode {
let value = UnversionedArchiver::new(OAuthCode {
status: OAuthStatus::Authorized,
account_id: access_token.primary_id(),
client_id,
@ -151,7 +151,7 @@ impl OAuthApiHandler for Server {
.core
.storage
.lookup
.key_get::<Archive<AlignedBytes>>(KeyValue::<()>::build_key(
.key_get::<UnversionedArchive<AlignedBytes>>(KeyValue::<()>::build_key(
KV_OAUTH,
code.as_bytes(),
))
@ -185,7 +185,7 @@ impl OAuthApiHandler for Server {
KeyValue::with_prefix(
KV_OAUTH,
oauth.params.as_bytes(),
Archiver::new(new_oauth_code)
UnversionedArchiver::new(new_oauth_code)
.serialize()
.caused_by(trc::location!())?,
)
@ -243,7 +243,7 @@ impl OAuthApiHandler for Server {
}
// Add OAuth status
let oauth_code = Archiver::new(OAuthCode {
let oauth_code = UnversionedArchiver::new(OAuthCode {
status: OAuthStatus::Pending,
account_id: u32::MAX,
client_id,

View file

@ -7,7 +7,6 @@
use http_proto::{HttpRequest, request::fetch_body};
use hyper::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
use store::{SERIALIZE_OAUTHCODE_V1, SerializedVersion};
use utils::map::vec_map::VecMap;
pub mod auth;
@ -56,12 +55,6 @@ pub struct OAuthCode {
pub params: String,
}
impl SerializedVersion for OAuthCode {
fn serialize_version() -> u8 {
SERIALIZE_OAUTHCODE_V1
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct DeviceAuthGet {
code: Option<String>,

View file

@ -4,6 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::{
ArchivedOAuthStatus, ErrorType, FormData, MAX_POST_LEN, OAuthCode, OAuthResponse, OAuthStatus,
TokenResponse, registration::ClientRegistrationHandler,
};
use common::{
KV_OAUTH, Server,
auth::{
@ -11,22 +15,15 @@ use common::{
oauth::{GrantType, oidc::StandardClaims},
},
};
use http_proto::*;
use hyper::StatusCode;
use std::future::Future;
use store::{
dispatch::lookup::KeyValue,
write::{AlignedBytes, Archive},
write::{AlignedBytes, UnversionedArchive},
};
use trc::AddContext;
use http_proto::*;
use super::{
ArchivedOAuthStatus, ErrorType, FormData, MAX_POST_LEN, OAuthCode, OAuthResponse, OAuthStatus,
TokenResponse, registration::ClientRegistrationHandler,
};
pub trait TokenHandler: Sync + Send {
fn handle_token_request(
&self,
@ -80,7 +77,7 @@ impl TokenHandler for Server {
.core
.storage
.lookup
.key_get::<Archive<AlignedBytes>>(KeyValue::<()>::build_key(
.key_get::<UnversionedArchive<AlignedBytes>>(KeyValue::<()>::build_key(
KV_OAUTH,
code.as_bytes(),
))
@ -152,7 +149,7 @@ impl TokenHandler for Server {
.core
.storage
.lookup
.key_get::<Archive<AlignedBytes>>(KeyValue::<()>::build_key(
.key_get::<UnversionedArchive<AlignedBytes>>(KeyValue::<()>::build_key(
KV_OAUTH,
device_code.as_bytes(),
))

View file

@ -4,29 +4,27 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::future::Future;
use common::{Server, auth::AccessToken};
use directory::{Permission, Type, backend::internal::manage::ManageDirectory};
use http_proto::{request::decode_path_element, *};
use hyper::Method;
use mail_auth::report::{
Feedback,
tlsrpt::{FailureDetails, Policy, TlsReport},
};
use serde_json::json;
use smtp::reporting::{ReportSerializer, analysis::IncomingReport};
use smtp::reporting::analysis::IncomingReport;
use std::future::Future;
use store::{
Deserialize, IterateParams, Key, U64_LEN, ValueKey,
write::{
AlignedBytes, Archive, BatchBuilder, ReportClass, ValueClass, key::DeserializeBigEndian,
AlignedBytes, BatchBuilder, ReportClass, UnversionedArchive, ValueClass,
key::DeserializeBigEndian,
},
};
use trc::AddContext;
use utils::url_params::UrlParams;
use http_proto::{request::decode_path_element, *};
enum ReportType {
Dmarc,
Tls,
@ -291,10 +289,10 @@ where
{
if let Some(tls) = server
.store()
.get_value::<Archive<AlignedBytes>>(key)
.get_value::<UnversionedArchive<AlignedBytes>>(key)
.await?
{
tls.deserialize::<ReportSerializer<T>>().map(|v| Some(v.0))
tls.deserialize::<T>().map(Some)
} else {
Ok(None)
}
@ -381,45 +379,39 @@ async fn fetch_incoming_reports(
last_id = id;
// TODO: Support filtering chunked records (over 10MB) on FDB
let archive = <Archive<AlignedBytes> as Deserialize>::deserialize(value)?;
let archive =
<UnversionedArchive<AlignedBytes> as Deserialize>::deserialize(value)?;
let matches = if has_filters {
match typ {
ReportType::Dmarc => {
let report =
archive
.deserialize::<ReportSerializer<IncomingReport<mail_auth::report::Report>>>()
.caused_by(trc::location!())?
.0;
let report = archive
.deserialize::<IncomingReport<mail_auth::report::Report>>()
.caused_by(trc::location!())?;
filter.is_none_or( |f| report.contains(f))
filter.is_none_or(|f| report.contains(f))
&& tenant_domains
.as_ref()
.is_none_or( |domains| report.has_domain(domains))
.is_none_or(|domains| report.has_domain(domains))
}
ReportType::Tls => {
let report =
archive
.deserialize::<ReportSerializer<IncomingReport<TlsReport>>>()
.caused_by(trc::location!())?
.0;
let report = archive
.deserialize::<IncomingReport<TlsReport>>()
.caused_by(trc::location!())?;
filter.is_none_or( |f| report.contains(f))
filter.is_none_or(|f| report.contains(f))
&& tenant_domains
.as_ref()
.is_none_or( |domains| report.has_domain(domains))
.is_none_or(|domains| report.has_domain(domains))
}
ReportType::Arf => {
let report =
archive
.deserialize::<ReportSerializer<IncomingReport<Feedback>>>()
.caused_by(trc::location!())?
.0;
let report = archive
.deserialize::<IncomingReport<Feedback>>()
.caused_by(trc::location!())?;
filter.is_none_or( |f| report.contains(f))
filter.is_none_or(|f| report.contains(f))
&& tenant_domains
.as_ref()
.is_none_or( |domains| report.has_domain(domains))
.is_none_or(|domains| report.has_domain(domains))
}
}
} else {

View file

@ -53,7 +53,7 @@ pub fn spawn_broadcast_subscriber(inner: Arc<Inner>, mut shutdown_rx: watch::Rec
);
match tokio::time::timeout(
Duration::from_secs(1 << retry_count.clamp(1, 6)),
Duration::from_secs(1 << retry_count.max(6)),
shutdown_rx.changed(),
)
.await
@ -63,7 +63,6 @@ pub fn spawn_broadcast_subscriber(inner: Arc<Inner>, mut shutdown_rx: watch::Rec
}
Err(_) => {
retry_count += 1;
continue;
}
}

View file

@ -14,7 +14,7 @@ use common::expr::{self, functions::ResolveVariable, *};
use compact_str::ToCompactString;
use smtp_proto::{ArchivedResponse, Response};
use store::{SERIALIZE_QUEUEMSG_V1, SerializedVersion, write::now};
use store::{SERIALIZE_QUEUE_MSG_V1, SerializedVersion, write::now};
use utils::BlobHash;
pub mod dsn;
@ -71,7 +71,7 @@ pub struct Message {
impl SerializedVersion for Message {
fn serialize_version() -> u8 {
SERIALIZE_QUEUEMSG_V1
SERIALIZE_QUEUE_MSG_V1
}
}

View file

@ -19,12 +19,10 @@ use std::{
};
use store::{
Serialize,
write::{Archiver, BatchBuilder, ReportClass, ValueClass, now},
write::{BatchBuilder, ReportClass, UnversionedArchiver, ValueClass, now},
};
use trc::IncomingReportEvent;
use super::ReportSerializer;
enum Compression {
None,
Gzip,
@ -281,12 +279,12 @@ impl AnalyzeReport for Server {
Format::Dmarc(report) => {
batch.set(
ValueClass::Report(ReportClass::Dmarc { id, expires }),
Archiver::new(ReportSerializer(IncomingReport {
UnversionedArchiver::new(IncomingReport {
from,
to,
subject,
report,
}))
})
.serialize()
.unwrap_or_default(),
);
@ -294,12 +292,12 @@ impl AnalyzeReport for Server {
Format::Tls(report) => {
batch.set(
ValueClass::Report(ReportClass::Tls { id, expires }),
Archiver::new(ReportSerializer(IncomingReport {
UnversionedArchiver::new(IncomingReport {
from,
to,
subject,
report,
}))
})
.serialize()
.unwrap_or_default(),
);
@ -307,12 +305,12 @@ impl AnalyzeReport for Server {
Format::Arf(report) => {
batch.set(
ValueClass::Report(ReportClass::Arf { id, expires }),
Archiver::new(ReportSerializer(IncomingReport {
UnversionedArchiver::new(IncomingReport {
from,
to,
subject,
report,
}))
})
.serialize()
.unwrap_or_default(),
);

View file

@ -14,6 +14,12 @@ use common::{
listener::SessionStream,
};
use super::{AggregateTimestamp, SerializedSize};
use crate::{
core::Session,
queue::{DomainPart, RecipientDomain},
reporting::SmtpReporting,
};
use compact_str::ToCompactString;
use mail_auth::{
ArcOutput, AuthenticatedMessage, AuthenticationResults, DkimOutput, DkimResult, DmarcOutput,
@ -24,19 +30,14 @@ use mail_auth::{
};
use store::{
Deserialize, IterateParams, Serialize, ValueKey,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ReportEvent, ValueClass},
write::{
AlignedBytes, BatchBuilder, QueueClass, ReportEvent, UnversionedArchive,
UnversionedArchiver, ValueClass,
},
};
use trc::{AddContext, OutgoingReportEvent};
use utils::config::Rate;
use crate::{
core::Session,
queue::{DomainPart, RecipientDomain},
reporting::SmtpReporting,
};
use super::{AggregateTimestamp, ReportSerializer, SerializedSize};
#[derive(
Debug,
PartialEq,
@ -476,12 +477,12 @@ impl DmarcReporting for Server {
// Deserialize report
let dmarc = match self
.store()
.get_value::<Archive<AlignedBytes>>(ValueKey::from(ValueClass::Queue(
.get_value::<UnversionedArchive<AlignedBytes>>(ValueKey::from(ValueClass::Queue(
QueueClass::DmarcReportHeader(event.clone()),
)))
.await?
{
Some(dmarc) => dmarc.deserialize::<ReportSerializer<DmarcFormat>>()?.0,
Some(dmarc) => dmarc.deserialize::<DmarcFormat>()?,
None => {
return Ok(None);
}
@ -551,9 +552,9 @@ impl DmarcReporting for Server {
.storage
.data
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
let archive = <Archive<AlignedBytes> as Deserialize>::deserialize(v)?;
let archive = <UnversionedArchive<AlignedBytes> as Deserialize>::deserialize(v)?;
match record_map.entry(archive.deserialize::<ReportSerializer<Record>>()?.0) {
match record_map.entry(archive.deserialize::<Record>()?) {
Entry::Occupied(mut e) => {
*e.get_mut() += 1;
Ok(true)
@ -660,7 +661,7 @@ impl DmarcReporting for Server {
// Write report
builder.set(
ValueClass::Queue(QueueClass::DmarcReportHeader(report_event.clone())),
match Archiver::new(ReportSerializer(entry)).serialize() {
match UnversionedArchiver::new(entry).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(
@ -677,7 +678,7 @@ impl DmarcReporting for Server {
report_event.seq_id = self.inner.data.queue_id_gen.generate();
builder.set(
ValueClass::Queue(QueueClass::DmarcReportEvent(report_event)),
match Archiver::new(ReportSerializer(event.report_record)).serialize() {
match UnversionedArchiver::new(event.report_record).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(

View file

@ -19,10 +19,7 @@ use mail_auth::{
};
use mail_parser::DateTime;
use store::{
SERIALIZE_REPORT_V1, SerializedVersion,
write::{ReportEvent, key::KeySerializer},
};
use store::write::{ReportEvent, key::KeySerializer};
use tokio::io::{AsyncRead, AsyncWrite};
use crate::{
@ -356,32 +353,3 @@ impl ReportLock for ReportEvent {
.finalize()
}
}
#[derive(rkyv::Serialize, rkyv::Deserialize, rkyv::Archive)]
#[repr(transparent)]
pub struct ReportSerializer<T>(pub T)
where
T: rkyv::Archive
+ for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
rkyv::rancor::Error,
>,
>;
impl<T> SerializedVersion for ReportSerializer<T>
where
T: rkyv::Archive
+ for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
rkyv::rancor::Error,
>,
>,
{
fn serialize_version() -> u8 {
SERIALIZE_REPORT_V1
}
}

View file

@ -4,8 +4,8 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{collections::hash_map::Entry, future::Future, sync::Arc, time::Duration};
use super::{AggregateTimestamp, SerializedSize};
use crate::{queue::RecipientDomain, reporting::SmtpReporting};
use ahash::AHashMap;
use common::{
Server, USER_AGENT,
@ -15,7 +15,6 @@ use common::{
},
ipc::{TlsEvent, ToHash},
};
use mail_auth::{
flate2::{Compression, write::GzEncoder},
mta_sts::{ReportUri, TlsRpt},
@ -23,20 +22,19 @@ use mail_auth::{
DateRange, FailureDetails, Policy, PolicyDetails, PolicyType, Summary, TlsReport,
},
};
use mail_parser::DateTime;
use reqwest::header::CONTENT_TYPE;
use std::fmt::Write;
use std::{collections::hash_map::Entry, future::Future, sync::Arc, time::Duration};
use store::{
Deserialize, IterateParams, Serialize, ValueKey,
write::{AlignedBytes, Archive, Archiver, BatchBuilder, QueueClass, ReportEvent, ValueClass},
write::{
AlignedBytes, BatchBuilder, QueueClass, ReportEvent, UnversionedArchive,
UnversionedArchiver, ValueClass,
},
};
use trc::{AddContext, OutgoingReportEvent};
use crate::{queue::RecipientDomain, reporting::SmtpReporting};
use super::{AggregateTimestamp, ReportSerializer, SerializedSize};
#[derive(Debug, Clone)]
pub struct TlsRptOptions {
pub record: Arc<TlsRpt>,
@ -298,12 +296,12 @@ impl TlsReporting for Server {
for event in events {
let tls = if let Some(tls) = self
.store()
.get_value::<Archive<AlignedBytes>>(ValueKey::from(ValueClass::Queue(
.get_value::<UnversionedArchive<AlignedBytes>>(ValueKey::from(ValueClass::Queue(
QueueClass::TlsReportHeader(event.clone()),
)))
.await?
{
tls.deserialize::<ReportSerializer<TlsFormat>>()?.0
tls.deserialize::<TlsFormat>()?
} else {
continue;
};
@ -336,10 +334,10 @@ impl TlsReporting for Server {
.storage
.data
.iterate(IterateParams::new(from_key, to_key).ascending(), |_, v| {
let archive = <Archive<AlignedBytes> as Deserialize>::deserialize(v)?;
if let Some(failure_details) = archive
.deserialize::<ReportSerializer<Option<FailureDetails>>>()?
.0
let archive =
<UnversionedArchive<AlignedBytes> as Deserialize>::deserialize(v)?;
if let Some(failure_details) =
archive.deserialize::<Option<FailureDetails>>()?
{
match record_map.entry(failure_details) {
Entry::Occupied(mut e) => {
@ -492,7 +490,7 @@ impl TlsReporting for Server {
// Write report
builder.set(
ValueClass::Queue(QueueClass::TlsReportHeader(report_event.clone())),
match Archiver::new(ReportSerializer(entry)).serialize() {
match UnversionedArchiver::new(entry).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(
@ -509,7 +507,7 @@ impl TlsReporting for Server {
report_event.seq_id = self.inner.data.queue_id_gen.generate();
builder.set(
ValueClass::Queue(QueueClass::TlsReportEvent(report_event)),
match Archiver::new(ReportSerializer(event.failure)).serialize() {
match UnversionedArchiver::new(event.failure).serialize() {
Ok(data) => data.to_vec(),
Err(err) => {
trc::error!(

View file

@ -44,24 +44,22 @@ pub trait SerializeInfallible {
}
// Max 64 versions (2 ^ 6)
pub const SERIALIZE_CERT_V1: u8 = 0;
pub const SERIALIZE_LOCKDATA_V1: u8 = 1;
pub const SERIALIZE_IDENTITY_V1: u8 = 2;
pub const SERIALIZE_MAILBOX_V1: u8 = 3;
pub const SERIALIZE_CRYPTO_V1: u8 = 4;
pub const SERIALIZE_MSGDATA_V1: u8 = 5;
pub const SERIALIZE_MSGMETADATA_V1: u8 = 6;
pub const SERIALIZE_PUSH_V1: u8 = 7;
pub const SERIALIZE_SIEVE_V1: u8 = 8;
pub const SERIALIZE_SUBMISSION_V1: u8 = 9;
pub const SERIALIZE_FILENODE_V1: u8 = 10;
pub const SERIALIZE_OAUTHCODE_V1: u8 = 11;
pub const SERIALIZE_QUEUEMSG_V1: u8 = 12;
pub const SERIALIZE_CALENDAR_V1: u8 = 13;
pub const SERIALIZE_ADDRESSBOOK_V1: u8 = 14;
pub const SERIALIZE_CALENDAREVENT_V1: u8 = 15;
pub const SERIALIZE_PRINCIPAL_V1: u8 = 16;
pub const SERIALIZE_REPORT_V1: u8 = 17;
pub const SERIALIZE_MESSAGE_DATA_V1: u8 = 0;
pub const SERIALIZE_MESSAGE_METADATA_V1: u8 = 1;
pub const SERIALIZE_MAILBOX_V1: u8 = 2;
pub const SERIALIZE_CRYPTO_V1: u8 = 3;
pub const SERIALIZE_IDENTITY_V1: u8 = 4;
pub const SERIALIZE_PUSH_V1: u8 = 5;
pub const SERIALIZE_SIEVE_V1: u8 = 6;
pub const SERIALIZE_SUBMISSION_V1: u8 = 7;
pub const SERIALIZE_QUEUE_MSG_V1: u8 = 8;
pub const SERIALIZE_CALENDAR_V1: u8 = 9;
pub const SERIALIZE_CALENDAR_EVENT_V1: u8 = 10;
pub const SERIALIZE_ADDRESS_BOOK_V1: u8 = 11;
pub const SERIALIZE_CONTACT_V1: u8 = 12;
pub const SERIALIZE_FILE_NODE_V1: u8 = 13;
pub const SERIALIZE_PRINCIPAL_V1: u8 = 14;
pub const SERIALIZE_DAV_LOCKS_V1: u8 = 15;
pub trait SerializedVersion {
fn serialize_version() -> u8;

View file

@ -42,6 +42,11 @@ pub struct Archive<T> {
pub hash: u32,
}
#[derive(Debug, Clone)]
pub struct UnversionedArchive<T> {
pub inner: T,
}
#[derive(Debug, Clone)]
pub enum AlignedBytes {
Aligned(AlignedVec<ARCHIVE_ALIGNMENT>),
@ -60,6 +65,18 @@ where
>,
>;
#[repr(transparent)]
pub struct UnversionedArchiver<T>(pub T)
where
T: rkyv::Archive
+ for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
rkyv::rancor::Error,
>,
>;
#[derive(Debug, Default)]
pub struct AssignedIds {
pub counter_ids: Vec<i64>,

View file

@ -4,7 +4,9 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::{ARCHIVE_ALIGNMENT, AlignedBytes, Archive, Archiver};
use super::{
ARCHIVE_ALIGNMENT, AlignedBytes, Archive, Archiver, UnversionedArchive, UnversionedArchiver,
};
use crate::{Deserialize, Serialize, SerializeInfallible, SerializedVersion, U32_LEN, Value};
use compact_str::format_compact;
use rkyv::util::AlignedVec;
@ -118,6 +120,71 @@ impl Deserialize for Archive<AlignedBytes> {
}
}
impl Deserialize for UnversionedArchive<AlignedBytes> {
fn deserialize(bytes: &[u8]) -> trc::Result<Self> {
let (marker, contents) = bytes.split_last().ok_or_else(|| {
trc::StoreEvent::DataCorruption
.into_err()
.details("Archive integrity compromised")
.ctx(trc::Key::Value, bytes)
.caused_by(trc::location!())
})?;
match marker & MARKER_MASK {
ARCHIVE_UNCOMPRESSED => {
let mut bytes = AlignedVec::with_capacity(contents.len());
bytes.extend_from_slice(contents);
Ok(UnversionedArchive {
inner: AlignedBytes::Aligned(bytes),
})
}
ARCHIVE_LZ4_COMPRESSED => {
aligned_lz4_deflate(contents).map(|inner| UnversionedArchive { inner })
}
_ => Err(trc::StoreEvent::DataCorruption
.into_err()
.details("Invalid archive marker.")
.ctx(trc::Key::Value, bytes)
.caused_by(trc::location!())),
}
}
fn deserialize_owned(mut bytes: Vec<u8>) -> trc::Result<Self> {
let (marker, contents) = bytes.split_last().ok_or_else(|| {
trc::StoreEvent::DataCorruption
.into_err()
.details("Archive integrity compromised")
.ctx(trc::Key::Value, bytes.as_slice())
.caused_by(trc::location!())
})?;
match marker & MARKER_MASK {
ARCHIVE_UNCOMPRESSED => {
bytes.truncate(contents.len());
if bytes.as_ptr().addr() & (ARCHIVE_ALIGNMENT - 1) == 0 {
Ok(UnversionedArchive {
inner: AlignedBytes::Vec(bytes),
})
} else {
let mut aligned = AlignedVec::with_capacity(bytes.len());
aligned.extend_from_slice(&bytes);
Ok(UnversionedArchive {
inner: AlignedBytes::Aligned(aligned),
})
}
}
ARCHIVE_LZ4_COMPRESSED => {
aligned_lz4_deflate(contents).map(|inner| UnversionedArchive { inner })
}
_ => Err(trc::StoreEvent::DataCorruption
.into_err()
.details("Invalid archive marker")
.ctx(trc::Key::Value, bytes)
.caused_by(trc::location!())),
}
}
}
#[inline]
fn aligned_lz4_deflate(archive: &[u8]) -> trc::Result<AlignedBytes> {
lz4_flex::block::uncompressed_size(archive)
@ -192,6 +259,54 @@ where
}
}
impl<T> Serialize for UnversionedArchiver<T>
where
T: rkyv::Archive
+ for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
rkyv::rancor::Error,
>,
>,
{
fn serialize(&self) -> trc::Result<Vec<u8>> {
rkyv::to_bytes::<rkyv::rancor::Error>(&self.0)
.map_err(|err| {
trc::StoreEvent::DeserializeError
.caused_by(trc::location!())
.reason(err)
})
.map(|input| {
let input = input.as_ref();
let input_len = input.len();
if input_len > COMPRESS_WATERMARK {
let mut bytes =
vec![
ARCHIVE_LZ4_COMPRESSED;
lz4_flex::block::get_maximum_output_size(input_len) + U32_LEN + 1
];
let compressed_len =
lz4_flex::compress_into(input, &mut bytes[U32_LEN..]).unwrap();
if compressed_len < input_len {
bytes[..U32_LEN].copy_from_slice(&(input_len as u32).to_le_bytes());
bytes.truncate(compressed_len + U32_LEN + 1);
} else {
bytes.clear();
bytes.extend_from_slice(input);
bytes.push(ARCHIVE_UNCOMPRESSED);
}
bytes
} else {
let mut bytes = Vec::with_capacity(input_len + 1);
bytes.extend_from_slice(input);
bytes.push(ARCHIVE_UNCOMPRESSED);
bytes
}
})
}
}
impl Archive<AlignedBytes> {
#[inline]
pub fn as_bytes(&self) -> &[u8] {
@ -318,6 +433,49 @@ impl Archive<AlignedBytes> {
}
}
impl UnversionedArchive<AlignedBytes> {
#[inline]
pub fn as_bytes(&self) -> &[u8] {
match &self.inner {
AlignedBytes::Vec(bytes) => bytes.as_slice(),
AlignedBytes::Aligned(bytes) => bytes.as_slice(),
}
}
pub fn unarchive<T>(&self) -> trc::Result<&<T as rkyv::Archive>::Archived>
where
T: rkyv::Archive,
T::Archived: for<'a> rkyv::bytecheck::CheckBytes<
rkyv::api::high::HighValidator<'a, rkyv::rancor::Error>,
> + rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<rkyv::rancor::Error>>,
{
rkyv::access::<T::Archived, rkyv::rancor::Error>(self.as_bytes()).map_err(|err| {
trc::StoreEvent::DeserializeError
.ctx(trc::Key::Value, self.as_bytes())
.details("Archive access failed")
.caused_by(trc::location!())
.reason(err)
})
}
pub fn deserialize<T>(&self) -> trc::Result<T>
where
T: rkyv::Archive,
T::Archived: for<'a> rkyv::bytecheck::CheckBytes<
rkyv::api::high::HighValidator<'a, rkyv::rancor::Error>,
> + rkyv::Deserialize<T, rkyv::api::high::HighDeserializer<rkyv::rancor::Error>>,
{
self.unarchive::<T>().and_then(|input| {
rkyv::deserialize(input).map_err(|err| {
trc::StoreEvent::DeserializeError
.ctx(trc::Key::Value, self.as_bytes())
.caused_by(trc::location!())
.reason(err)
})
})
}
}
impl<T> Archiver<T>
where
T: rkyv::Archive
@ -338,6 +496,26 @@ where
}
}
impl<T> UnversionedArchiver<T>
where
T: rkyv::Archive
+ for<'a> rkyv::Serialize<
rkyv::api::high::HighSerializer<
rkyv::util::AlignedVec,
rkyv::ser::allocator::ArenaHandle<'a>,
rkyv::rancor::Error,
>,
>,
{
pub fn new(inner: T) -> Self {
Self(inner)
}
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> Archive<&T>
where
T: rkyv::Portable
@ -486,3 +664,9 @@ impl<T> From<Value<'static>> for Archive<T> {
unimplemented!()
}
}
impl<T> From<Value<'static>> for UnversionedArchive<T> {
fn from(_: Value<'static>) -> Self {
unimplemented!()
}
}

View file

@ -15,7 +15,6 @@ use std::{
#[derive(Debug)]
pub struct SnowflakeIdGenerator {
epoch: SystemTime,
last_timestamp: AtomicU64,
node_id: u64,
sequence: AtomicU64,
}
@ -31,7 +30,7 @@ const NODE_ID_MASK: u64 = (1 << NODE_ID_LEN) - 1;
const DEFAULT_EPOCH: u64 = 1632280000; // 52 years after UNIX_EPOCH
const DEFAULT_EPOCH_MS: u128 = (DEFAULT_EPOCH as u128) * 1000; // 52 years after UNIX_EPOCH in milliseconds
const MAX_CLOCK_DRIFT: i64 = 60 * 1000; // 1 minute
const MAX_CLOCK_DRIFT: i64 = 2000; // 2 seconds
static LOGICAL_TIME: AtomicU64 = AtomicU64::new(0);
static CHANGE_SEQ: AtomicU64 = AtomicU64::new(0);
@ -73,7 +72,6 @@ impl SnowflakeIdGenerator {
epoch: SystemTime::UNIX_EPOCH + Duration::from_secs(DEFAULT_EPOCH), // 52 years after UNIX_EPOCH
node_id,
sequence: 0.into(),
last_timestamp: 0.into(),
}
}
@ -92,22 +90,15 @@ impl SnowflakeIdGenerator {
#[inline(always)]
pub fn generate(&self) -> u64 {
let current_elapsed = self
let elapsed = self
.epoch
.elapsed()
.map(|e| e.as_millis())
.unwrap_or_default() as u64;
let last_elapsed = self
.last_timestamp
.fetch_max(current_elapsed, Ordering::Relaxed);
let (elapsed, sequence) = if current_elapsed > last_elapsed {
(current_elapsed, 0)
} else {
(last_elapsed, self.sequence.fetch_add(1, Ordering::Relaxed))
};
let sequence = self.sequence.fetch_add(1, Ordering::Relaxed) & SEQUENCE_MASK;
(elapsed << (SEQUENCE_LEN + NODE_ID_LEN))
| ((sequence & SEQUENCE_MASK) << NODE_ID_LEN)
| (sequence << NODE_ID_LEN)
| (self.node_id & NODE_ID_MASK)
}
}
@ -139,20 +130,18 @@ impl HlcTimestamp {
pub fn generate() -> u64 {
let node_id = *NODE_MUM;
let current_elapsed = SystemTime::UNIX_EPOCH
let elapsed = SystemTime::UNIX_EPOCH
.elapsed()
.map(|e| e.as_millis())
.unwrap_or_default()
.saturating_sub(DEFAULT_EPOCH_MS) as u64;
let last_elapsed = LOGICAL_TIME.fetch_max(current_elapsed, Ordering::SeqCst);
let (elapsed, sequence) = if current_elapsed > last_elapsed {
(current_elapsed, 0)
} else {
(last_elapsed, CHANGE_SEQ.fetch_add(1, Ordering::Relaxed))
};
let elapsed = LOGICAL_TIME
.fetch_max(elapsed, Ordering::SeqCst)
.max(elapsed);
let sequence = CHANGE_SEQ.fetch_add(1, Ordering::Relaxed) & SEQUENCE_MASK;
(elapsed << (SEQUENCE_LEN + NODE_ID_LEN))
| ((sequence & SEQUENCE_MASK) << NODE_ID_LEN)
| (sequence << NODE_ID_LEN)
| (node_id as u64 & NODE_ID_MASK)
}
}
@ -168,7 +157,6 @@ impl Clone for SnowflakeIdGenerator {
Self {
epoch: self.epoch,
node_id: self.node_id,
last_timestamp: 0.into(),
sequence: 0.into(),
}
}

View file

@ -452,9 +452,10 @@ impl AssertResult for Vec<String> {
}
if match_all && match_count != self.len() - 1 {
panic!(
"Expected {} mailboxes, but got {}",
"Expected {} mailboxes, but got {}: {:?}",
match_count,
self.len() - 1
self.len() - 1,
self.iter().collect::<Vec<_>>()
);
}
self

View file

@ -101,7 +101,7 @@ async fn jmap_tests_() {
.await;
webhooks::test(&mut params).await;
email_query::test(&mut params, delete).await;
/*email_query::test(&mut params, delete).await;
email_get::test(&mut params).await;
email_set::test(&mut params).await;
email_parse::test(&mut params).await;
@ -113,7 +113,7 @@ async fn jmap_tests_() {
thread_merge::test(&mut params).await;
mailbox::test(&mut params).await;
delivery::test(&mut params).await;
auth_acl::test(&mut params).await;
auth_acl::test(&mut params).await;*/
auth_limits::test(&mut params).await;
auth_oauth::test(&mut params).await;
event_source::test(&mut params).await;