mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2025-12-11 22:06:31 +08:00
Reset IMAP UIDs endpoint
This commit is contained in:
parent
16f9f3fa1b
commit
7dfaa5bf11
19 changed files with 204 additions and 90 deletions
|
|
@ -17,7 +17,7 @@ use jmap_proto::types::{collection::Collection, property::Property};
|
|||
use store::{
|
||||
write::{
|
||||
key::DeserializeBigEndian, AnyKey, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
|
||||
LookupClass, QueueClass, QueueEvent, TagValue, ValueClass,
|
||||
InMemoryClass, QueueClass, QueueEvent, TagValue, ValueClass,
|
||||
},
|
||||
BitmapKey, Deserialize, IndexKey, IterateParams, LogKey, Serialize, ValueKey,
|
||||
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, U32_LEN, U64_LEN,
|
||||
|
|
@ -538,13 +538,13 @@ impl Core {
|
|||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Lookup(LookupClass::Key(vec![0])),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(vec![0])),
|
||||
},
|
||||
ValueKey {
|
||||
account_id: u32::MAX,
|
||||
collection: u8::MAX,
|
||||
document_id: u32::MAX,
|
||||
class: ValueClass::Lookup(LookupClass::Key(vec![
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(vec![
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
|
|
@ -578,13 +578,13 @@ impl Core {
|
|||
account_id: 0,
|
||||
collection: 0,
|
||||
document_id: 0,
|
||||
class: ValueClass::Lookup(LookupClass::Counter(vec![0])),
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(vec![0])),
|
||||
},
|
||||
ValueKey {
|
||||
account_id: u32::MAX,
|
||||
collection: u8::MAX,
|
||||
document_id: u32::MAX,
|
||||
class: ValueClass::Lookup(LookupClass::Counter(vec![
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(vec![
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
u8::MAX,
|
||||
|
|
@ -611,9 +611,9 @@ impl Core {
|
|||
|
||||
for key in counters {
|
||||
let value = store
|
||||
.get_counter(ValueKey::from(ValueClass::Lookup(LookupClass::Counter(
|
||||
key.clone(),
|
||||
))))
|
||||
.get_counter(ValueKey::from(ValueClass::InMemory(
|
||||
InMemoryClass::Counter(key.clone()),
|
||||
)))
|
||||
.await
|
||||
.failed("Failed to get counter");
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ use store::{
|
|||
roaring::RoaringBitmap,
|
||||
write::{
|
||||
key::DeserializeBigEndian, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
|
||||
LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue, TaskQueueClass,
|
||||
InMemoryClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue, TaskQueueClass,
|
||||
ValueClass,
|
||||
},
|
||||
BlobStore, Serialize, Store, U32_LEN,
|
||||
|
|
@ -167,11 +167,11 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
|
|||
batch.set(ValueClass::Config(key), value);
|
||||
}
|
||||
Family::LookupValue => {
|
||||
batch.set(ValueClass::Lookup(LookupClass::Key(key)), value);
|
||||
batch.set(ValueClass::InMemory(InMemoryClass::Key(key)), value);
|
||||
}
|
||||
Family::LookupCounter => {
|
||||
batch.add(
|
||||
ValueClass::Lookup(LookupClass::Counter(key)),
|
||||
ValueClass::InMemory(InMemoryClass::Counter(key)),
|
||||
i64::deserialize(&value).expect("Failed to deserialize counter"),
|
||||
);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -225,7 +225,7 @@ impl<T: SessionStream> SessionData<T> {
|
|||
}
|
||||
}
|
||||
|
||||
// Perepare write batch
|
||||
// Prepare write batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
|
|
|
|||
|
|
@ -16,7 +16,13 @@ use directory::{
|
|||
Permission,
|
||||
};
|
||||
use hyper::Method;
|
||||
use jmap_proto::{
|
||||
object::{index::ObjectIndexBuilder, Object},
|
||||
types::{collection::Collection, property::Property, value::Value},
|
||||
};
|
||||
use serde_json::json;
|
||||
use store::write::{assert::HashedValue, BatchBuilder, ValueClass, F_VALUE};
|
||||
use trc::AddContext;
|
||||
use utils::url_params::UrlParams;
|
||||
|
||||
use crate::{
|
||||
|
|
@ -24,7 +30,10 @@ use crate::{
|
|||
http::{HttpSessionData, ToHttpResponse},
|
||||
HttpRequest, HttpResponse, JsonResponse,
|
||||
},
|
||||
email::ingest::EmailIngest,
|
||||
mailbox::{set::SCHEMA, UidMailbox},
|
||||
services::index::Indexer,
|
||||
JmapMethods,
|
||||
};
|
||||
|
||||
use super::decode_path_element;
|
||||
|
|
@ -261,6 +270,22 @@ impl ManageStore for Server {
|
|||
}
|
||||
}
|
||||
// SPDX-SnippetEnd
|
||||
(Some("uids"), Some(account_id), None, &Method::DELETE) => {
|
||||
let account_id = self
|
||||
.core
|
||||
.storage
|
||||
.data
|
||||
.get_principal_id(decode_path_element(account_id).as_ref())
|
||||
.await?
|
||||
.ok_or_else(|| trc::ManageEvent::NotFound.into_err())?;
|
||||
|
||||
let result = reset_imap_uids(self, account_id).await?;
|
||||
|
||||
Ok(JsonResponse::new(json!({
|
||||
"data": result,
|
||||
}))
|
||||
.into_http_response())
|
||||
}
|
||||
_ => Err(trc::ResourceEvent::NotFound.into_err()),
|
||||
}
|
||||
}
|
||||
|
|
@ -283,3 +308,91 @@ impl ManageStore for Server {
|
|||
.into_http_response())
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn reset_imap_uids(server: &Server, account_id: u32) -> trc::Result<(u32, u32)> {
|
||||
let mut mailbox_count = 0;
|
||||
let mut email_count = 0;
|
||||
|
||||
for mailbox_id in server
|
||||
.get_document_ids(account_id, Collection::Mailbox)
|
||||
.await?
|
||||
.unwrap_or_default()
|
||||
{
|
||||
let mailbox = server
|
||||
.get_property::<HashedValue<Object<Value>>>(
|
||||
account_id,
|
||||
Collection::Mailbox,
|
||||
mailbox_id,
|
||||
Property::Value,
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.ok_or_else(|| trc::ImapEvent::Error.into_err().caused_by(trc::location!()))?;
|
||||
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Mailbox)
|
||||
.update_document(mailbox_id)
|
||||
.custom(
|
||||
ObjectIndexBuilder::new(SCHEMA)
|
||||
.with_current(mailbox)
|
||||
.with_changes(Object::with_capacity(1).with_property(
|
||||
Property::Cid,
|
||||
Value::UnsignedInt(rand::random::<u32>() as u64),
|
||||
)),
|
||||
)
|
||||
.clear(Property::EmailIds);
|
||||
server
|
||||
.write_batch(batch)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
mailbox_count += 1;
|
||||
}
|
||||
|
||||
// Reset all UIDs
|
||||
for message_id in server
|
||||
.get_document_ids(account_id, Collection::Email)
|
||||
.await
|
||||
.caused_by(trc::location!())?
|
||||
.unwrap_or_default()
|
||||
{
|
||||
let uids = server
|
||||
.get_property::<HashedValue<Vec<UidMailbox>>>(
|
||||
account_id,
|
||||
Collection::Email,
|
||||
message_id,
|
||||
Property::MailboxIds,
|
||||
)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
let mut uids = if let Some(uids) = uids.filter(|uids| !uids.inner.is_empty()) {
|
||||
uids
|
||||
} else {
|
||||
continue;
|
||||
};
|
||||
|
||||
for uid_mailbox in &mut uids.inner {
|
||||
uid_mailbox.uid = server
|
||||
.assign_imap_uid(account_id, uid_mailbox.mailbox_id)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
}
|
||||
|
||||
// Prepare write batch
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch
|
||||
.with_account_id(account_id)
|
||||
.with_collection(Collection::Email)
|
||||
.update_document(message_id)
|
||||
.assert_value(ValueClass::Property(Property::MailboxIds.into()), &uids)
|
||||
.value(Property::MailboxIds, uids.inner, F_VALUE);
|
||||
server
|
||||
.write_batch(batch)
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
email_count += 1;
|
||||
}
|
||||
|
||||
Ok((mailbox_count, email_count))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,7 +24,8 @@ use crate::{
|
|||
AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId, ValueOp,
|
||||
MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN, WITH_SUBSPACE,
|
||||
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA,
|
||||
U32_LEN, WITH_SUBSPACE,
|
||||
};
|
||||
|
||||
use super::{
|
||||
|
|
@ -304,7 +305,7 @@ impl FdbStore {
|
|||
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
|
||||
// Obtain all zero counters
|
||||
let mut delete_keys = Vec::new();
|
||||
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
|
||||
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
let trx = self.db.create_trx().map_err(into_error)?;
|
||||
let from_key = [subspace, 0u8];
|
||||
let to_key = [subspace, u8::MAX, u8::MAX, u8::MAX, u8::MAX, u8::MAX];
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ impl MysqlStore {
|
|||
SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_BLOB_RESERVE,
|
||||
SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_LOOKUP_VALUE,
|
||||
SUBSPACE_IN_MEMORY_VALUE,
|
||||
SUBSPACE_PROPERTY,
|
||||
SUBSPACE_SETTINGS,
|
||||
SUBSPACE_QUEUE_MESSAGE,
|
||||
|
|
@ -149,7 +149,7 @@ impl MysqlStore {
|
|||
.map_err(into_error)?;
|
||||
}
|
||||
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
conn.query_drop(format!(
|
||||
"CREATE TABLE IF NOT EXISTS {} (
|
||||
k TINYBLOB,
|
||||
|
|
|
|||
|
|
@ -16,8 +16,7 @@ use crate::{
|
|||
write::{
|
||||
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
|
||||
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
|
||||
}, BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA, U32_LEN
|
||||
};
|
||||
|
||||
use super::{into_error, MysqlStore};
|
||||
|
|
@ -322,7 +321,7 @@ impl MysqlStore {
|
|||
|
||||
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
|
||||
let mut conn = self.conn_pool.get_conn().await.map_err(into_error)?;
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
let s = conn
|
||||
.prep(format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ impl PostgresStore {
|
|||
SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_BLOB_RESERVE,
|
||||
SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_LOOKUP_VALUE,
|
||||
SUBSPACE_IN_MEMORY_VALUE,
|
||||
SUBSPACE_PROPERTY,
|
||||
SUBSPACE_SETTINGS,
|
||||
SUBSPACE_QUEUE_MESSAGE,
|
||||
|
|
@ -130,7 +130,7 @@ impl PostgresStore {
|
|||
.map_err(into_error)?;
|
||||
}
|
||||
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
conn.execute(
|
||||
&format!(
|
||||
"CREATE TABLE IF NOT EXISTS {} (
|
||||
|
|
|
|||
|
|
@ -17,8 +17,7 @@ use crate::{
|
|||
write::{
|
||||
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
|
||||
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
|
||||
}, BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA, U32_LEN
|
||||
};
|
||||
|
||||
use super::{into_error, PostgresStore};
|
||||
|
|
@ -335,7 +334,7 @@ impl PostgresStore {
|
|||
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
|
||||
let conn = self.conn_pool.get().await.map_err(into_error)?;
|
||||
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
let s = conn
|
||||
.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ impl RocksDbStore {
|
|||
}
|
||||
|
||||
// Counters
|
||||
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
|
||||
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
let mut cf_opts = Options::default();
|
||||
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
|
||||
cfs.push(ColumnFamilyDescriptor::new(
|
||||
|
|
@ -77,7 +77,7 @@ impl RocksDbStore {
|
|||
SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_BLOB_RESERVE,
|
||||
SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_LOOKUP_VALUE,
|
||||
SUBSPACE_IN_MEMORY_VALUE,
|
||||
SUBSPACE_PROPERTY,
|
||||
SUBSPACE_SETTINGS,
|
||||
SUBSPACE_QUEUE_MESSAGE,
|
||||
|
|
|
|||
|
|
@ -19,12 +19,10 @@ use rocksdb::{
|
|||
|
||||
use super::{into_error, CfHandle, RocksDbStore, CF_INDEXES, CF_LOGS};
|
||||
use crate::{
|
||||
backend::deserialize_i64_le,
|
||||
write::{
|
||||
backend::deserialize_i64_le, write::{
|
||||
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
|
||||
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, Deserialize, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
|
||||
}, BitmapKey, Deserialize, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA, U32_LEN
|
||||
};
|
||||
|
||||
impl RocksDbStore {
|
||||
|
|
@ -84,7 +82,7 @@ impl RocksDbStore {
|
|||
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
|
||||
let db = self.db.clone();
|
||||
self.spawn_worker(move || {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
let cf = db
|
||||
.cf_handle(std::str::from_utf8(&[subspace]).unwrap())
|
||||
.unwrap();
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ impl SqliteStore {
|
|||
SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_BLOB_RESERVE,
|
||||
SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_LOOKUP_VALUE,
|
||||
SUBSPACE_IN_MEMORY_VALUE,
|
||||
SUBSPACE_PROPERTY,
|
||||
SUBSPACE_SETTINGS,
|
||||
SUBSPACE_QUEUE_MESSAGE,
|
||||
|
|
@ -139,7 +139,7 @@ impl SqliteStore {
|
|||
.map_err(into_error)?;
|
||||
}
|
||||
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
|
||||
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
conn.execute(
|
||||
&format!(
|
||||
"CREATE TABLE IF NOT EXISTS {} (
|
||||
|
|
|
|||
|
|
@ -11,8 +11,7 @@ use crate::{
|
|||
write::{
|
||||
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
|
||||
ValueOp,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
|
||||
}, BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_QUOTA, U32_LEN
|
||||
};
|
||||
|
||||
use super::{into_error, SqliteStore};
|
||||
|
|
@ -259,7 +258,7 @@ impl SqliteStore {
|
|||
pub(crate) async fn purge_store(&self) -> trc::Result<()> {
|
||||
let conn = self.conn_pool.get().map_err(into_error)?;
|
||||
self.spawn_worker(move || {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
|
||||
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER, SUBSPACE_IN_MEMORY_COUNTER] {
|
||||
conn.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
|
||||
.map_err(into_error)?
|
||||
.execute([])
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use std::borrow::Cow;
|
|||
use trc::AddContext;
|
||||
use utils::config::Rate;
|
||||
|
||||
use crate::{backend::http::lookup::HttpStoreGet, write::LookupClass};
|
||||
use crate::{backend::http::lookup::HttpStoreGet, write::InMemoryClass};
|
||||
#[allow(unused_imports)]
|
||||
use crate::{
|
||||
write::{
|
||||
|
|
@ -31,7 +31,7 @@ impl InMemoryStore {
|
|||
InMemoryStore::Store(store) => {
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(kv.key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(kv.key)),
|
||||
op: ValueOp::Set(
|
||||
KeySerializer::new(kv.value.len() + U64_LEN)
|
||||
.write(kv.expires.map_or(u64::MAX, |expires| now() + expires))
|
||||
|
|
@ -60,7 +60,7 @@ impl InMemoryStore {
|
|||
|
||||
if let Some(expires) = kv.expires {
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(kv.key.clone())),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(kv.key.clone())),
|
||||
op: ValueOp::Set(
|
||||
KeySerializer::new(U64_LEN * 2)
|
||||
.write(0u64)
|
||||
|
|
@ -72,7 +72,7 @@ impl InMemoryStore {
|
|||
}
|
||||
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Counter(kv.key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(kv.key)),
|
||||
op: ValueOp::AddAndGet(kv.value),
|
||||
});
|
||||
|
||||
|
|
@ -97,7 +97,7 @@ impl InMemoryStore {
|
|||
InMemoryStore::Store(store) => {
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(key.into().into_bytes())),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(key.into().into_bytes())),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
store.write(batch.build()).await.map(|_| ())
|
||||
|
|
@ -118,7 +118,7 @@ impl InMemoryStore {
|
|||
InMemoryStore::Store(store) => {
|
||||
let mut batch = BatchBuilder::new();
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Counter(key.into().into_bytes())),
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(key.into().into_bytes())),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
store.write(batch.build()).await.map(|_| ())
|
||||
|
|
@ -148,17 +148,19 @@ impl InMemoryStore {
|
|||
|
||||
store
|
||||
.delete_range(
|
||||
ValueKey::from(ValueClass::Lookup(LookupClass::Counter(
|
||||
ValueKey::from(ValueClass::InMemory(InMemoryClass::Counter(
|
||||
from_range.clone(),
|
||||
))),
|
||||
ValueKey::from(ValueClass::Lookup(LookupClass::Counter(to_range.clone()))),
|
||||
ValueKey::from(ValueClass::InMemory(InMemoryClass::Counter(
|
||||
to_range.clone(),
|
||||
))),
|
||||
)
|
||||
.await?;
|
||||
|
||||
store
|
||||
.delete_range(
|
||||
ValueKey::from(ValueClass::Lookup(LookupClass::Key(from_range))),
|
||||
ValueKey::from(ValueClass::Lookup(LookupClass::Key(to_range))),
|
||||
ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(from_range))),
|
||||
ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(to_range))),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
|
@ -179,9 +181,9 @@ impl InMemoryStore {
|
|||
) -> trc::Result<Option<T>> {
|
||||
match self {
|
||||
InMemoryStore::Store(store) => store
|
||||
.get_value::<LookupValue<T>>(ValueKey::from(ValueClass::Lookup(LookupClass::Key(
|
||||
key.into().into_bytes(),
|
||||
))))
|
||||
.get_value::<LookupValue<T>>(ValueKey::from(ValueClass::InMemory(
|
||||
InMemoryClass::Key(key.into().into_bytes()),
|
||||
)))
|
||||
.await
|
||||
.map(|value| value.and_then(|v| v.into())),
|
||||
#[cfg(feature = "redis")]
|
||||
|
|
@ -202,9 +204,9 @@ impl InMemoryStore {
|
|||
match self {
|
||||
InMemoryStore::Store(store) => {
|
||||
store
|
||||
.get_counter(ValueKey::from(ValueClass::Lookup(LookupClass::Counter(
|
||||
key.into().into_bytes(),
|
||||
))))
|
||||
.get_counter(ValueKey::from(ValueClass::InMemory(
|
||||
InMemoryClass::Counter(key.into().into_bytes()),
|
||||
)))
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "redis")]
|
||||
|
|
@ -221,9 +223,9 @@ impl InMemoryStore {
|
|||
pub async fn key_exists(&self, key: impl Into<LookupKey<'_>>) -> trc::Result<bool> {
|
||||
match self {
|
||||
InMemoryStore::Store(store) => store
|
||||
.get_value::<LookupValue<()>>(ValueKey::from(ValueClass::Lookup(LookupClass::Key(
|
||||
key.into().into_bytes(),
|
||||
))))
|
||||
.get_value::<LookupValue<()>>(ValueKey::from(ValueClass::InMemory(
|
||||
InMemoryClass::Key(key.into().into_bytes()),
|
||||
)))
|
||||
.await
|
||||
.map(|value| matches!(value, Some(LookupValue::Value(())))),
|
||||
#[cfg(feature = "redis")]
|
||||
|
|
@ -283,9 +285,9 @@ impl InMemoryStore {
|
|||
match self {
|
||||
InMemoryStore::Store(store) => {
|
||||
// Delete expired keys and counters
|
||||
let from_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![0u8])));
|
||||
let from_key = ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![0u8])));
|
||||
let to_key =
|
||||
ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![u8::MAX; 10])));
|
||||
ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![u8::MAX; 10])));
|
||||
|
||||
let current_time = now();
|
||||
let mut expired_keys = Vec::new();
|
||||
|
|
@ -313,7 +315,7 @@ impl InMemoryStore {
|
|||
let mut batch = BatchBuilder::new();
|
||||
for key in expired_keys {
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(key)),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
if batch.ops.len() >= 1000 {
|
||||
|
|
@ -336,11 +338,11 @@ impl InMemoryStore {
|
|||
let mut batch = BatchBuilder::new();
|
||||
for key in expired_counters {
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Counter(key.clone())),
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(key.clone())),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(key)),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
if batch.ops.len() >= 1000 {
|
||||
|
|
|
|||
|
|
@ -595,7 +595,7 @@ impl Store {
|
|||
SUBSPACE_BLOB_RESERVE,
|
||||
SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_LOGS,
|
||||
SUBSPACE_LOOKUP_VALUE,
|
||||
SUBSPACE_IN_MEMORY_VALUE,
|
||||
SUBSPACE_COUNTER,
|
||||
SUBSPACE_PROPERTY,
|
||||
SUBSPACE_SETTINGS,
|
||||
|
|
@ -694,11 +694,11 @@ impl Store {
|
|||
|
||||
#[cfg(feature = "test_mode")]
|
||||
pub async fn lookup_expire_all(&self) {
|
||||
use crate::write::LookupClass;
|
||||
use crate::write::InMemoryClass;
|
||||
|
||||
// Delete all temporary counters
|
||||
let from_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![0u8])));
|
||||
let to_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![u8::MAX; 10])));
|
||||
let from_key = ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![0u8])));
|
||||
let to_key = ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![u8::MAX; 10])));
|
||||
|
||||
let mut expired_keys = Vec::new();
|
||||
let mut expired_counters = Vec::new();
|
||||
|
|
@ -719,7 +719,7 @@ impl Store {
|
|||
let mut batch = BatchBuilder::new();
|
||||
for key in expired_keys {
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(key)),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
if batch.ops.len() >= 1000 {
|
||||
|
|
@ -736,11 +736,11 @@ impl Store {
|
|||
let mut batch = BatchBuilder::new();
|
||||
for key in expired_counters {
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Counter(key.clone())),
|
||||
class: ValueClass::InMemory(InMemoryClass::Counter(key.clone())),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
batch.ops.push(Operation::Value {
|
||||
class: ValueClass::Lookup(LookupClass::Key(key)),
|
||||
class: ValueClass::InMemory(InMemoryClass::Key(key)),
|
||||
op: ValueOp::Clear,
|
||||
});
|
||||
if batch.ops.len() >= 1000 {
|
||||
|
|
@ -773,7 +773,8 @@ impl Store {
|
|||
(SUBSPACE_ACL, true),
|
||||
//(SUBSPACE_DIRECTORY, true),
|
||||
(SUBSPACE_TASK_QUEUE, true),
|
||||
(SUBSPACE_LOOKUP_VALUE, true),
|
||||
(SUBSPACE_IN_MEMORY_VALUE, true),
|
||||
(SUBSPACE_IN_MEMORY_COUNTER, false),
|
||||
(SUBSPACE_PROPERTY, true),
|
||||
(SUBSPACE_SETTINGS, true),
|
||||
(SUBSPACE_QUEUE_MESSAGE, true),
|
||||
|
|
|
|||
|
|
@ -142,7 +142,8 @@ pub const SUBSPACE_BLOB_LINK: u8 = b'k';
|
|||
pub const SUBSPACE_BLOBS: u8 = b't';
|
||||
pub const SUBSPACE_LOGS: u8 = b'l';
|
||||
pub const SUBSPACE_COUNTER: u8 = b'n';
|
||||
pub const SUBSPACE_LOOKUP_VALUE: u8 = b'm';
|
||||
pub const SUBSPACE_IN_MEMORY_VALUE: u8 = b'm';
|
||||
pub const SUBSPACE_IN_MEMORY_COUNTER: u8 = b'y';
|
||||
pub const SUBSPACE_PROPERTY: u8 = b'p';
|
||||
pub const SUBSPACE_SETTINGS: u8 = b's';
|
||||
pub const SUBSPACE_QUEUE_MESSAGE: u8 = b'e';
|
||||
|
|
@ -155,7 +156,6 @@ pub const SUBSPACE_TELEMETRY_SPAN: u8 = b'o';
|
|||
pub const SUBSPACE_TELEMETRY_INDEX: u8 = b'w';
|
||||
pub const SUBSPACE_TELEMETRY_METRIC: u8 = b'x';
|
||||
|
||||
pub const SUBSPACE_RESERVED_1: u8 = b'y';
|
||||
pub const SUBSPACE_RESERVED_2: u8 = b'z';
|
||||
|
||||
#[derive(Clone)]
|
||||
|
|
|
|||
|
|
@ -11,15 +11,16 @@ use crate::{
|
|||
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_ACL,
|
||||
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_BLOB_LINK,
|
||||
SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX,
|
||||
SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY,
|
||||
SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_IN,
|
||||
SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TASK_QUEUE, SUBSPACE_TELEMETRY_INDEX,
|
||||
SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN, WITH_SUBSPACE,
|
||||
SUBSPACE_INDEXES, SUBSPACE_IN_MEMORY_COUNTER, SUBSPACE_IN_MEMORY_VALUE, SUBSPACE_LOGS,
|
||||
SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA,
|
||||
SUBSPACE_REPORT_IN, SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TASK_QUEUE,
|
||||
SUBSPACE_TELEMETRY_INDEX, SUBSPACE_TELEMETRY_METRIC, SUBSPACE_TELEMETRY_SPAN, U32_LEN, U64_LEN,
|
||||
WITH_SUBSPACE,
|
||||
};
|
||||
|
||||
use super::{
|
||||
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, LookupClass, QueueClass, ReportClass,
|
||||
ReportEvent, ResolveId, TagValue, TaskQueueClass, TelemetryClass, ValueClass,
|
||||
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, InMemoryClass, QueueClass,
|
||||
ReportClass, ReportEvent, ResolveId, TagValue, TaskQueueClass, TelemetryClass, ValueClass,
|
||||
};
|
||||
|
||||
pub struct KeySerializer {
|
||||
|
|
@ -306,9 +307,9 @@ impl<T: ResolveId> ValueClass<T> {
|
|||
.write(*id as u32),
|
||||
},
|
||||
ValueClass::Config(key) => serializer.write(key.as_slice()),
|
||||
ValueClass::Lookup(lookup) => match lookup {
|
||||
LookupClass::Key(key) => serializer.write(key.as_slice()),
|
||||
LookupClass::Counter(key) => serializer.write(key.as_slice()),
|
||||
ValueClass::InMemory(lookup) => match lookup {
|
||||
InMemoryClass::Key(key) => serializer.write(key.as_slice()),
|
||||
InMemoryClass::Counter(key) => serializer.write(key.as_slice()),
|
||||
},
|
||||
ValueClass::Directory(directory) => match directory {
|
||||
DirectoryClass::NameToId(name) => serializer.write(0u8).write(name.as_slice()),
|
||||
|
|
@ -541,7 +542,7 @@ impl<T> ValueClass<T> {
|
|||
}
|
||||
}
|
||||
ValueClass::Acl(_) => U32_LEN * 3 + 2,
|
||||
ValueClass::Lookup(LookupClass::Counter(v) | LookupClass::Key(v))
|
||||
ValueClass::InMemory(InMemoryClass::Counter(v) | InMemoryClass::Key(v))
|
||||
| ValueClass::Config(v) => v.len(),
|
||||
ValueClass::Directory(d) => match d {
|
||||
DirectoryClass::NameToId(v) | DirectoryClass::EmailToId(v) => v.len(),
|
||||
|
|
@ -595,9 +596,9 @@ impl<T> ValueClass<T> {
|
|||
}
|
||||
},
|
||||
ValueClass::Config(_) => SUBSPACE_SETTINGS,
|
||||
ValueClass::Lookup(lookup) => match lookup {
|
||||
LookupClass::Key(_) => SUBSPACE_LOOKUP_VALUE,
|
||||
LookupClass::Counter(_) => SUBSPACE_COUNTER,
|
||||
ValueClass::InMemory(lookup) => match lookup {
|
||||
InMemoryClass::Key(_) => SUBSPACE_IN_MEMORY_VALUE,
|
||||
InMemoryClass::Counter(_) => SUBSPACE_IN_MEMORY_COUNTER,
|
||||
},
|
||||
ValueClass::Directory(directory) => match directory {
|
||||
DirectoryClass::UsedQuota(_) => SUBSPACE_QUOTA,
|
||||
|
|
@ -625,7 +626,7 @@ impl<T> ValueClass<T> {
|
|||
pub fn is_counter(&self, collection: u8) -> bool {
|
||||
match self {
|
||||
ValueClass::Directory(DirectoryClass::UsedQuota(_))
|
||||
| ValueClass::Lookup(LookupClass::Counter(_))
|
||||
| ValueClass::InMemory(InMemoryClass::Counter(_))
|
||||
| ValueClass::Queue(QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_)) => true,
|
||||
ValueClass::Property(84) if collection == 1 => true, // TODO: Find a more elegant way to do this
|
||||
_ => false,
|
||||
|
|
|
|||
|
|
@ -144,7 +144,7 @@ pub enum TagValue<T> {
|
|||
pub enum ValueClass<T> {
|
||||
Property(u8),
|
||||
Acl(u32),
|
||||
Lookup(LookupClass),
|
||||
InMemory(InMemoryClass),
|
||||
FtsIndex(BitmapHash),
|
||||
TaskQueue(TaskQueueClass),
|
||||
Directory(DirectoryClass<T>),
|
||||
|
|
@ -176,7 +176,7 @@ pub struct AnyClass {
|
|||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
|
||||
pub enum LookupClass {
|
||||
pub enum InMemoryClass {
|
||||
Key(Vec<u8>),
|
||||
Counter(Vec<u8>),
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ use jmap_proto::types::{collection::Collection, property::Property};
|
|||
use store::{
|
||||
rand,
|
||||
write::{
|
||||
AnyKey, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass, LookupClass,
|
||||
AnyKey, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass, InMemoryClass,
|
||||
MaybeDynamicId, MaybeDynamicValue, Operation, QueueClass, QueueEvent, TagValue, ValueClass,
|
||||
},
|
||||
*,
|
||||
|
|
@ -163,11 +163,11 @@ pub async fn test(db: Store) {
|
|||
random_bytes(idx),
|
||||
);
|
||||
batch.set(
|
||||
ValueClass::Lookup(LookupClass::Key(random_bytes(idx))),
|
||||
ValueClass::InMemory(InMemoryClass::Key(random_bytes(idx))),
|
||||
random_bytes(idx),
|
||||
);
|
||||
batch.add(
|
||||
ValueClass::Lookup(LookupClass::Counter(random_bytes(idx))),
|
||||
ValueClass::InMemory(InMemoryClass::Counter(random_bytes(idx))),
|
||||
rand::random(),
|
||||
);
|
||||
batch.set(
|
||||
|
|
@ -286,7 +286,8 @@ impl Snapshot {
|
|||
(SUBSPACE_BLOBS, true),
|
||||
(SUBSPACE_LOGS, true),
|
||||
(SUBSPACE_COUNTER, !is_sql),
|
||||
(SUBSPACE_LOOKUP_VALUE, true),
|
||||
(SUBSPACE_IN_MEMORY_COUNTER, !is_sql),
|
||||
(SUBSPACE_IN_MEMORY_VALUE, true),
|
||||
(SUBSPACE_PROPERTY, true),
|
||||
(SUBSPACE_SETTINGS, true),
|
||||
(SUBSPACE_QUEUE_MESSAGE, true),
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue