Use permanent links for blobs in the SMTP queue

This commit is contained in:
mdecimus 2024-05-09 09:09:12 +02:00
parent e10083651b
commit 7a25689288
5 changed files with 37 additions and 36 deletions

View file

@ -36,8 +36,6 @@ use super::{
}; };
pub const LOCK_EXPIRY: u64 = 300; pub const LOCK_EXPIRY: u64 = 300;
pub const BLOB_EXPIRY: u64 = 3600;
pub const SPOOL_ACCOUNT_ID: u32 = u32::MAX - 1;
#[derive(Debug)] #[derive(Debug)]
pub struct QueueEventLock { pub struct QueueEventLock {
@ -152,17 +150,12 @@ impl SMTP {
event = "locked", event = "locked",
id = event.queue_id, id = event.queue_id,
due = event.due, due = event.due,
"Failed to lock event: Event already locked." "Lock busy: Event already locked."
); );
None None
} }
Err(err) => { Err(err) => {
tracing::error!( tracing::error!(context = "queue", event = "error", "Lock error: {}", err);
context = "queue",
event = "error",
"Failed to lock event: {}",
err
);
None None
} }
} }
@ -219,10 +212,11 @@ impl Message {
// Reserve and write blob // Reserve and write blob
let mut batch = BatchBuilder::new(); let mut batch = BatchBuilder::new();
batch.with_account_id(SPOOL_ACCOUNT_ID).set( let reserve_until = now() + 120;
batch.set(
BlobOp::Reserve { BlobOp::Reserve {
hash: self.blob_hash.clone(), hash: self.blob_hash.clone(),
until: self.next_delivery_event() + BLOB_EXPIRY, until: reserve_until,
}, },
0u32.serialize(), 0u32.serialize(),
); );
@ -293,6 +287,17 @@ impl Message {
})), })),
0u64.serialize(), 0u64.serialize(),
) )
.clear(BlobOp::Reserve {
hash: self.blob_hash.clone(),
until: reserve_until,
})
.set(
BlobOp::LinkId {
hash: self.blob_hash.clone(),
id: self.id,
},
vec![],
)
.set( .set(
BlobOp::Commit { BlobOp::Commit {
hash: self.blob_hash.clone(), hash: self.blob_hash.clone(),
@ -411,14 +416,6 @@ impl Message {
); );
} }
batch.with_account_id(SPOOL_ACCOUNT_ID).set(
BlobOp::Reserve {
hash: self.blob_hash.clone(),
until: self.next_delivery_event() + BLOB_EXPIRY,
},
0u32.serialize(),
);
batch.set( batch.set(
ValueClass::Queue(QueueClass::Message(self.id)), ValueClass::Queue(QueueClass::Message(self.id)),
Bincode::new(self).serialize(), Bincode::new(self).serialize(),
@ -456,18 +453,10 @@ impl Message {
} }
batch batch
.with_account_id(SPOOL_ACCOUNT_ID) .clear(BlobOp::LinkId {
.clear(BlobOp::Reserve {
hash: self.blob_hash.clone(), hash: self.blob_hash.clone(),
until: prev_event + BLOB_EXPIRY, id: self.id,
}) })
.set(
BlobOp::Reserve {
hash: self.blob_hash.clone(),
until: now() - 1,
},
0u32.serialize(),
)
.clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent { .clear(ValueClass::Queue(QueueClass::MessageEvent(QueueEvent {
due: prev_event, due: prev_event,
queue_id: self.id, queue_id: self.id,

View file

@ -195,7 +195,7 @@ impl SMTP {
context = "queue", context = "queue",
event = "locked", event = "locked",
key = ?lock, key = ?lock,
"Failed to lock report: Event already locked." "Lock busy: Event already locked."
); );
false false
} }
@ -203,7 +203,7 @@ impl SMTP {
tracing::error!( tracing::error!(
context = "queue", context = "queue",
event = "error", event = "error",
"Failed to lock report: {}", "Lock busy: {}",
err err
); );
false false
@ -215,7 +215,7 @@ impl SMTP {
event = "locked", event = "locked",
key = ?lock, key = ?lock,
expiry = expiry - now, expiry = expiry - now,
"Failed to lock report: Report already locked." "Lock busy: Report already locked."
); );
false false
} }
@ -225,7 +225,7 @@ impl SMTP {
context = "queue", context = "queue",
event = "locked", event = "locked",
key = ?lock, key = ?lock,
"Failed to lock report: Report lock deleted." "Lock busy: Report lock deleted."
); );
false false
} }
@ -234,7 +234,7 @@ impl SMTP {
context = "queue", context = "queue",
event = "error", event = "error",
key = ?lock, key = ?lock,
"Failed to lock report: {}", "Lock error: {}",
err err
); );
false false

View file

@ -296,6 +296,11 @@ impl<T: ResolveId> ValueClass<T> {
.write(account_id) .write(account_id)
.write(collection) .write(collection)
.write(document_id), .write(document_id),
BlobOp::LinkId { hash, id } => serializer
.write::<&[u8]>(hash.as_ref())
.write((*id >> 32) as u32)
.write(u8::MAX)
.write(*id as u32),
}, },
ValueClass::Config(key) => serializer.write(key.as_slice()), ValueClass::Config(key) => serializer.write(key.as_slice()),
ValueClass::Lookup(lookup) => match lookup { ValueClass::Lookup(lookup) => match lookup {
@ -523,7 +528,9 @@ impl<T> ValueClass<T> {
}, },
ValueClass::Blob(op) => match op { ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1, BlobOp::Reserve { .. } => BLOB_HASH_LEN + U64_LEN + U32_LEN + 1,
BlobOp::Commit { .. } | BlobOp::Link { .. } => BLOB_HASH_LEN + U32_LEN * 2 + 2, BlobOp::Commit { .. } | BlobOp::Link { .. } | BlobOp::LinkId { .. } => {
BLOB_HASH_LEN + U32_LEN * 2 + 2
}
}, },
ValueClass::IndexEmail { .. } => BLOB_HASH_LEN + U64_LEN * 2, ValueClass::IndexEmail { .. } => BLOB_HASH_LEN + U64_LEN * 2,
ValueClass::Queue(q) => match q { ValueClass::Queue(q) => match q {
@ -555,7 +562,9 @@ impl<T> ValueClass<T> {
ValueClass::IndexEmail { .. } => SUBSPACE_FTS_INDEX, ValueClass::IndexEmail { .. } => SUBSPACE_FTS_INDEX,
ValueClass::Blob(op) => match op { ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE, BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE,
BlobOp::Commit { .. } | BlobOp::Link { .. } => SUBSPACE_BLOB_LINK, BlobOp::Commit { .. } | BlobOp::Link { .. } | BlobOp::LinkId { .. } => {
SUBSPACE_BLOB_LINK
}
}, },
ValueClass::Config(_) => SUBSPACE_SETTINGS, ValueClass::Config(_) => SUBSPACE_SETTINGS,
ValueClass::Lookup(lookup) => match lookup { ValueClass::Lookup(lookup) => match lookup {

View file

@ -243,6 +243,7 @@ pub enum BlobOp {
Reserve { hash: BlobHash, until: u64 }, Reserve { hash: BlobHash, until: u64 },
Commit { hash: BlobHash }, Commit { hash: BlobHash },
Link { hash: BlobHash }, Link { hash: BlobHash },
LinkId { hash: BlobHash, id: u64 },
} }
#[derive(Debug, PartialEq, Clone, Eq, Hash)] #[derive(Debug, PartialEq, Clone, Eq, Hash)]

View file

@ -42,6 +42,8 @@ FROM debian:buster-slim AS runtime
COPY --from=builder /app/target/release/stalwart-mail /usr/local/bin/stalwart-mail COPY --from=builder /app/target/release/stalwart-mail /usr/local/bin/stalwart-mail
RUN apt-get update -y && apt-get install -yq ca-certificates RUN apt-get update -y && apt-get install -yq ca-certificates
RUN curl -LO https://github.com/apple/foundationdb/releases/download/7.1.0/foundationdb-clients_7.1.0-1_amd64.deb && \
dpkg -i foundationdb-clients_7.1.0-1_amd64.deb
RUN useradd stalwart-mail -s /sbin/nologin -M RUN useradd stalwart-mail -s /sbin/nologin -M
RUN mkdir -p /opt/stalwart-mail RUN mkdir -p /opt/stalwart-mail
RUN chown stalwart-mail:stalwart-mail /opt/stalwart-mail RUN chown stalwart-mail:stalwart-mail /opt/stalwart-mail