Individual subspaces for each value and bitmap type

This commit is contained in:
mdecimus 2024-04-27 10:47:36 +02:00
parent 98e13667d6
commit 9bebcd9b8c
26 changed files with 669 additions and 697 deletions

View file

@ -143,12 +143,9 @@ impl ConfigManager {
.iterate(
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let mut key =
std::str::from_utf8(key.get(1..).unwrap_or_default()).map_err(|_| {
store::Error::InternalError(
"Failed to deserialize config key".to_string(),
)
})?;
let mut key = std::str::from_utf8(key).map_err(|_| {
store::Error::InternalError("Failed to deserialize config key".to_string())
})?;
if !patterns.is_local_key(key) {
if strip_prefix && !prefix.is_empty() {

View file

@ -190,7 +190,7 @@ impl JMAP {
if values {
result_values.push(Message::from(&message));
} else {
result_ids.push(key.deserialize_be_u64(1)?);
result_ids.push(key.deserialize_be_u64(0)?);
}
total_returned += 1;
}

View file

@ -93,8 +93,8 @@ impl SMTP {
IterateParams::new(from_key, to_key).ascending(),
|key, value| {
let event = QueueEventLock {
due: key.deserialize_be_u64(1)?,
queue_id: key.deserialize_be_u64(U64_LEN + 1)?,
due: key.deserialize_be_u64(0)?,
queue_id: key.deserialize_be_u64(U64_LEN)?,
lock_expiry: u64::deserialize(value)?,
};
let do_continue = event.due <= now;

View file

@ -41,7 +41,7 @@ use crate::{
AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId, ValueOp,
MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTERS, SUBSPACE_VALUES, U32_LEN, WITH_SUBSPACE,
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN, WITH_SUBSPACE,
};
use super::{
@ -87,7 +87,7 @@ impl FdbStore {
WITH_SUBSPACE,
(&result).into(),
);
let do_chunk = key[0] == SUBSPACE_VALUES;
let do_chunk = !class.is_counter(collection);
match op {
ValueOp::Set(value) => {
@ -293,31 +293,27 @@ impl FdbStore {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
// Obtain all zero counters
let mut delete_keys = Vec::new();
let trx = self.db.create_trx()?;
let mut iter = trx.get_ranges(
RangeOption {
begin: KeySelector::first_greater_or_equal(&[SUBSPACE_COUNTERS, 0u8][..]),
end: KeySelector::first_greater_or_equal(
&[
SUBSPACE_COUNTERS,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
][..],
),
mode: options::StreamingMode::WantAll,
reverse: false,
..Default::default()
},
true,
);
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
let trx = self.db.create_trx()?;
let from_key = [subspace, 0u8];
let to_key = [subspace, u8::MAX, u8::MAX, u8::MAX, u8::MAX, u8::MAX];
while let Some(values) = iter.next().await {
for value in values? {
if value.value().iter().all(|byte| *byte == 0) {
delete_keys.push(value.key().to_vec());
let mut iter = trx.get_ranges(
RangeOption {
begin: KeySelector::first_greater_or_equal(&from_key[..]),
end: KeySelector::first_greater_or_equal(&to_key[..]),
mode: options::StreamingMode::WantAll,
reverse: false,
..Default::default()
},
true,
);
while let Some(values) = iter.next().await {
for value in values? {
if value.value().iter().all(|byte| *byte == 0) {
delete_keys.push(value.key().to_vec());
}
}
}
}

View file

@ -26,10 +26,7 @@ use std::time::Duration;
use mysql_async::{prelude::Queryable, OptsBuilder, Pool, PoolConstraints, PoolOpts, SslOpts};
use utils::config::{utils::AsKey, Config};
use crate::{
SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
SUBSPACE_VALUES,
};
use crate::*;
use super::MysqlStore;
@ -99,7 +96,23 @@ impl MysqlStore {
pub(super) async fn create_tables(&self) -> crate::Result<()> {
let mut conn = self.conn_pool.get_conn().await?;
for table in [SUBSPACE_VALUES, SUBSPACE_LOGS] {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
SUBSPACE_LOOKUP_EXPIRY,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_LOGS,
] {
let table = char::from(table);
conn.query_drop(&format!(
"CREATE TABLE IF NOT EXISTS {table} (
@ -121,7 +134,12 @@ impl MysqlStore {
))
.await?;
for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
for table in [
SUBSPACE_INDEXES,
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
let table = char::from(table);
conn.query_drop(&format!(
"CREATE TABLE IF NOT EXISTS {table} (
@ -132,15 +150,17 @@ impl MysqlStore {
.await?;
}
conn.query_drop(&format!(
"CREATE TABLE IF NOT EXISTS {} (
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
conn.query_drop(&format!(
"CREATE TABLE IF NOT EXISTS {} (
k TINYBLOB,
v BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY (k(255))
) ENGINE=InnoDB",
char::from(SUBSPACE_COUNTERS)
))
.await?;
char::from(table)
))
.await?;
}
Ok(())
}

View file

@ -66,9 +66,12 @@ impl MysqlStore {
let key_len = begin.len();
let end = key.serialize(0);
let mut conn = self.conn_pool.get_conn().await?;
let table = char::from(key.subspace());
let mut bm = RoaringBitmap::new();
let s = conn.prep("SELECT k FROM b WHERE k >= ? AND k <= ?").await?;
let s = conn
.prep(&format!("SELECT k FROM {table} WHERE k >= ? AND k <= ?"))
.await?;
let mut rows = conn.exec_stream::<Vec<u8>, _, _>(&s, (begin, end)).await?;
while let Some(key) = rows.try_next().await? {
@ -144,9 +147,13 @@ impl MysqlStore {
&self,
key: impl Into<ValueKey<ValueClass<u32>>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(0);
let key = key.into();
let table = char::from(key.subspace());
let key = key.serialize(0);
let mut conn = self.conn_pool.get_conn().await?;
let s = conn.prep("SELECT v FROM c WHERE k = ?").await?;
let s = conn
.prep(&format!("SELECT v FROM {table} WHERE k = ?"))
.await?;
match conn.exec_first::<i64, _, _>(&s, (key,)).await {
Ok(Some(num)) => Ok(num),
Ok(None) => Ok(0),

View file

@ -34,7 +34,7 @@ use crate::{
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTERS, U32_LEN,
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
};
use super::MysqlStore;
@ -158,22 +158,30 @@ impl MysqlStore {
ValueOp::AtomicAdd(by) => {
if *by >= 0 {
let s = trx
.prep(concat!(
"INSERT INTO c (k, v) VALUES (?, ?) ",
"ON DUPLICATE KEY UPDATE v = v + VALUES(v)"
.prep(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES (?, ?) ",
"ON DUPLICATE KEY UPDATE v = v + VALUES(v)"
),
table
))
.await?;
trx.exec_drop(&s, (key, by)).await?;
} else {
let s = trx.prep("UPDATE c SET v = v + ? WHERE k = ?").await?;
let s = trx
.prep(&format!("UPDATE {table} SET v = v + ? WHERE k = ?"))
.await?;
trx.exec_drop(&s, (by, key)).await?;
}
}
ValueOp::AddAndGet(by) => {
let s = trx
.prep(concat!(
"INSERT INTO c (k, v) VALUES (:k, LAST_INSERT_ID(:v)) ",
"ON DUPLICATE KEY UPDATE v = LAST_INSERT_ID(v + :v)"
.prep(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES (:k, LAST_INSERT_ID(:v)) ",
"ON DUPLICATE KEY UPDATE v = LAST_INSERT_ID(v + :v)"
),
table
))
.await?;
trx.exec_drop(&s, params! {"k" => key, "v" => by}).await?;
@ -251,15 +259,18 @@ impl MysqlStore {
}
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
let table = char::from(class.subspace());
let s = if *set {
if is_document_id {
trx.prep("INSERT INTO b (k) VALUES (?)").await?
} else {
trx.prep("INSERT IGNORE INTO b (k) VALUES (?)").await?
trx.prep(&format!("INSERT IGNORE INTO {} (k) VALUES (?)", table))
.await?
}
} else {
trx.prep("DELETE FROM b WHERE k = ?").await?
trx.prep(&format!("DELETE FROM {} WHERE k = ?", table))
.await?
};
if let Err(err) = trx.exec_drop(&s, (key,)).await {
@ -320,14 +331,14 @@ impl MysqlStore {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let mut conn = self.conn_pool.get_conn().await?;
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
let s = conn
.prep(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
.await?;
conn.exec_drop(&s, ()).await?;
}
let s = conn
.prep(&format!(
"DELETE FROM {} WHERE v = 0",
char::from(SUBSPACE_COUNTERS),
))
.await?;
conn.exec_drop(&s, ()).await.map_err(Into::into)
Ok(())
}
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {

View file

@ -23,10 +23,7 @@
use std::time::Duration;
use crate::{
backend::postgres::tls::MakeRustlsConnect, SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS,
SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES,
};
use crate::{backend::postgres::tls::MakeRustlsConnect, *};
use super::PostgresStore;
@ -92,7 +89,24 @@ impl PostgresStore {
pub(super) async fn create_tables(&self) -> crate::Result<()> {
let conn = self.conn_pool.get().await?;
for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
SUBSPACE_LOOKUP_EXPIRY,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {
let table = char::from(table);
conn.execute(
&format!(
@ -106,7 +120,12 @@ impl PostgresStore {
.await?;
}
for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
for table in [
SUBSPACE_INDEXES,
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
let table = char::from(table);
conn.execute(
&format!(
@ -119,17 +138,19 @@ impl PostgresStore {
.await?;
}
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BYTEA PRIMARY KEY,
v BIGINT NOT NULL DEFAULT 0
)",
char::from(SUBSPACE_COUNTERS)
),
&[],
)
.await?;
char::from(table)
),
&[],
)
.await?;
}
Ok(())
}

View file

@ -65,10 +65,11 @@ impl PostgresStore {
let key_len = begin.len();
let end = key.serialize(0);
let conn = self.conn_pool.get().await?;
let table = char::from(key.subspace());
let mut bm = RoaringBitmap::new();
let s = conn
.prepare_cached("SELECT k FROM b WHERE k >= $1 AND k <= $2")
.prepare_cached(&format!("SELECT k FROM {table} WHERE k >= $1 AND k <= $2"))
.await?;
let rows = conn.query_raw(&s, &[&begin, &end]).await?;
@ -142,9 +143,14 @@ impl PostgresStore {
&self,
key: impl Into<ValueKey<ValueClass<u32>>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(0);
let key = key.into();
let table = char::from(key.subspace());
let key = key.serialize(0);
let conn = self.conn_pool.get().await?;
let s = conn.prepare_cached("SELECT v FROM c WHERE k = $1").await?;
let s = conn
.prepare_cached(&format!("SELECT v FROM {table} WHERE k = $1"))
.await?;
match conn.query_opt(&s, &[&key]).await {
Ok(Some(row)) => row.try_get(0).map_err(Into::into),
Ok(None) => Ok(0),

View file

@ -35,7 +35,7 @@ use crate::{
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTERS, U32_LEN,
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
};
use super::PostgresStore;
@ -165,24 +165,32 @@ impl PostgresStore {
ValueOp::AtomicAdd(by) => {
if *by >= 0 {
let s = trx
.prepare_cached(concat!(
"INSERT INTO c (k, v) VALUES ($1, $2) ",
"ON CONFLICT(k) DO UPDATE SET v = c.v + EXCLUDED.v"
.prepare_cached(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES ($1, $2) ",
"ON CONFLICT(k) DO UPDATE SET v = {}.v + EXCLUDED.v"
),
table, table
))
.await?;
trx.execute(&s, &[&key, &by]).await?;
} else {
let s = trx
.prepare_cached("UPDATE c SET v = v + $1 WHERE k = $2")
.prepare_cached(&format!(
"UPDATE {table} SET v = v + $1 WHERE k = $2"
))
.await?;
trx.execute(&s, &[&by, &key]).await?;
}
}
ValueOp::AddAndGet(by) => {
let s = trx
.prepare_cached(concat!(
"INSERT INTO c (k, v) VALUES ($1, $2) ",
"ON CONFLICT(k) DO UPDATE SET v = c.v + EXCLUDED.v RETURNING v"
.prepare_cached(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES ($1, $2) ",
"ON CONFLICT(k) DO UPDATE SET v = {}.v + EXCLUDED.v RETURNING v"
),
table, table
))
.await?;
result.push_counter_id(
@ -261,18 +269,21 @@ impl PostgresStore {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
let table = char::from(class.subspace());
let s = if *set {
if is_document_id {
trx.prepare_cached("INSERT INTO b (k) VALUES ($1)").await?
} else {
trx.prepare_cached(
"INSERT INTO b (k) VALUES ($1) ON CONFLICT (k) DO NOTHING",
)
trx.prepare_cached(&format!(
"INSERT INTO {} (k) VALUES ($1) ON CONFLICT (k) DO NOTHING",
table
))
.await?
}
} else {
trx.prepare_cached("DELETE FROM b WHERE k = $1").await?
trx.prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table))
.await?
};
trx.execute(&s, &[&key]).await.map_err(|err| {
@ -335,13 +346,14 @@ impl PostgresStore {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let conn = self.conn_pool.get().await?;
let s = conn
.prepare_cached(&format!(
"DELETE FROM {} WHERE v = 0",
char::from(SUBSPACE_COUNTERS),
))
.await?;
conn.execute(&s, &[]).await.map(|_| ()).map_err(Into::into)
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
let s = conn
.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))
.await?;
conn.execute(&s, &[]).await.map(|_| ())?
}
Ok(())
}
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {

View file

@ -1,335 +0,0 @@
/*
* Copyright (c) 2023, Stalwart Labs Ltd.
*
* This file is part of Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use crate::{Deserialize, Serialize};
use roaring::RoaringBitmap;
use utils::codec::leb128::{Leb128Iterator, Leb128Vec};
use crate::U32_LEN;
pub const BIT_SET: u8 = 0x80;
pub const BIT_CLEAR: u8 = 0;
pub const IS_BITLIST: u8 = 0;
pub const IS_BITMAP: u8 = 1;
#[inline(always)]
pub fn deserialize_bitlist(bm: &mut RoaringBitmap, bytes: &[u8]) {
let mut it = bytes[1..].iter();
'inner: while let Some(header) = it.next() {
let mut items = (header & 0x7F) + 1;
let is_set = (header & BIT_SET) != 0;
while items > 0 {
if let Some(doc_id) = it.next_leb128() {
if is_set {
bm.insert(doc_id);
} else {
bm.remove(doc_id);
}
items -= 1;
} else {
debug_assert!(items == 0, "{:?}", bytes);
break 'inner;
}
}
}
}
#[inline(always)]
pub fn deserialize_bitmap(bytes: &[u8]) -> crate::Result<RoaringBitmap> {
RoaringBitmap::deserialize_unchecked_from(bytes).map_err(|err| {
crate::Error::InternalError(format!("Failed to deserialize bitmap: {}", err))
})
}
impl Deserialize for RoaringBitmap {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
match *bytes
.first()
.ok_or_else(|| crate::Error::InternalError("Empty bitmap".to_string()))?
{
IS_BITMAP => deserialize_bitmap(&bytes[1..]),
IS_BITLIST => {
let mut bm = RoaringBitmap::new();
deserialize_bitlist(&mut bm, bytes);
Ok(bm)
}
_ => Err(crate::Error::InternalError(
"Invalid bitmap type".to_string(),
)),
}
}
}
impl Serialize for RoaringBitmap {
fn serialize(self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.serialized_size() + 1);
bytes.push(IS_BITMAP);
let _ = self.serialize_into(&mut bytes);
bytes
}
}
macro_rules! impl_bit {
($single:ident, $many:ident, $flag:ident) => {
#[inline(always)]
pub fn $single(document: u32) -> Vec<u8> {
let mut buf = Vec::with_capacity(U32_LEN + 2);
buf.push(IS_BITLIST);
buf.push($flag);
buf.push_leb128(document);
buf
}
#[inline(always)]
pub fn $many<T>(documents: T) -> Vec<u8>
where
T: Iterator<Item = u32>,
{
debug_assert!(documents.size_hint().0 > 0);
let mut buf = Vec::with_capacity(
((U32_LEN + 1)
* documents
.size_hint()
.1
.unwrap_or_else(|| documents.size_hint().0))
+ 2,
);
buf.push(IS_BITLIST);
let mut header_pos = 0;
let mut total_docs = 0;
for (pos, document) in documents.enumerate() {
if pos & 0x7F == 0 {
header_pos = buf.len();
buf.push($flag | 0x7F);
}
buf.push_leb128(document);
total_docs = pos;
}
buf[header_pos] = $flag | ((total_docs & 0x7F) as u8);
buf
}
};
}
impl_bit!(set_bit, set_bits, BIT_SET);
impl_bit!(clear_bit, clear_bits, BIT_CLEAR);
#[inline(always)]
pub fn set_clear_bits<T>(documents: T) -> Vec<u8>
where
T: Iterator<Item = (u32, bool)>,
{
debug_assert!(documents.size_hint().0 > 0);
let total_docs = documents
.size_hint()
.1
.unwrap_or_else(|| documents.size_hint().0);
let buf_len = (U32_LEN * total_docs) + (total_docs / 0x7F) + 2;
let mut set_buf = Vec::with_capacity(buf_len);
let mut clear_buf = Vec::with_capacity(buf_len);
let mut set_header_pos = 0;
let mut set_total_docs = 0;
let mut clear_header_pos = 0;
let mut clear_total_docs = 0;
set_buf.push(IS_BITLIST);
clear_buf.push(IS_BITLIST);
for (document, is_set) in documents {
if is_set {
if set_total_docs & 0x7F == 0 {
set_header_pos = set_buf.len();
set_buf.push(BIT_SET | 0x7F);
}
set_buf.push_leb128(document);
set_total_docs += 1;
} else {
if clear_total_docs & 0x7F == 0 {
clear_header_pos = clear_buf.len();
clear_buf.push(BIT_CLEAR | 0x7F);
}
clear_buf.push_leb128(document);
clear_total_docs += 1;
}
}
if set_total_docs > 0 {
set_buf[set_header_pos] = BIT_SET | (((set_total_docs - 1) & 0x7F) as u8);
}
if clear_total_docs > 0 {
clear_buf[clear_header_pos] = BIT_CLEAR | (((clear_total_docs - 1) & 0x7F) as u8);
}
if set_total_docs > 0 && clear_total_docs > 0 {
set_buf.extend_from_slice(&clear_buf[1..]);
set_buf
} else if set_total_docs > 0 {
set_buf
} else {
clear_buf
}
}
#[inline(always)]
pub fn bitmap_merge<'x>(
existing_val: Option<&[u8]>,
operands_len: usize,
operands: impl IntoIterator<Item = &'x [u8]>,
) -> Option<Vec<u8>> {
let mut bm = match existing_val {
Some(existing_val) => RoaringBitmap::deserialize(existing_val).ok()?,
None if operands_len == 1 => {
return Some(Vec::from(operands.into_iter().next().unwrap()));
}
_ => RoaringBitmap::new(),
};
for op in operands.into_iter() {
match *op.first()? {
IS_BITMAP => {
if let Ok(union_bm) = deserialize_bitmap(&op[1..]) {
if !bm.is_empty() {
bm |= union_bm;
} else {
bm = union_bm;
}
} else {
debug_assert!(false, "Failed to deserialize bitmap.");
return None;
}
}
IS_BITLIST => {
deserialize_bitlist(&mut bm, op);
}
_ => {
debug_assert!(false, "This should not have happened");
return None;
}
}
}
let mut bytes = Vec::with_capacity(bm.serialized_size() + 1);
bytes.push(IS_BITMAP);
bm.serialize_into(&mut bytes).ok()?;
Some(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn merge_bitmaps() {
let v1 = set_clear_bits([(1, true), (2, true), (3, false), (4, true)].into_iter());
let v2 = set_clear_bits([(1, false), (4, false)].into_iter());
let v3 = set_clear_bits([(5, true)].into_iter());
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(&v1).unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(&bitmap_merge(None, 1, [v1.as_ref()]).unwrap()).unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2]),
RoaringBitmap::deserialize(&bitmap_merge(None, 2, [v1.as_ref(), v2.as_ref()]).unwrap())
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2, 5]),
RoaringBitmap::deserialize(
&bitmap_merge(None, 3, [v1.as_ref(), v2.as_ref(), v3.as_ref()]).unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([2, 5]),
RoaringBitmap::deserialize(
&bitmap_merge(Some(v1.as_ref()), 2, [v2.as_ref(), v3.as_ref()]).unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([5]),
RoaringBitmap::deserialize(&bitmap_merge(Some(v2.as_ref()), 1, [v3.as_ref()]).unwrap())
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
1,
[v1.as_ref()]
)
.unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 3, 4, 5, 6]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
1,
[RoaringBitmap::from_iter([5, 6]).serialize().as_ref()]
)
.unwrap()
)
.unwrap()
);
assert_eq!(
RoaringBitmap::from_iter([1, 2, 4, 5, 6]),
RoaringBitmap::deserialize(
&bitmap_merge(
Some(RoaringBitmap::from_iter([1, 2, 3, 4]).serialize().as_ref()),
2,
[
RoaringBitmap::from_iter([5, 6]).serialize().as_ref(),
v1.as_ref()
]
)
.unwrap()
)
.unwrap()
);
}
}

View file

@ -28,7 +28,9 @@ use rocksdb::{ColumnFamilyDescriptor, MergeOperands, OptimisticTransactionDB, Op
use tokio::sync::oneshot;
use utils::config::{utils::AsKey, Config};
use super::{RocksDbStore, CF_BITMAPS, CF_BLOBS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES};
use crate::*;
use super::{RocksDbStore, CF_BLOBS};
impl RocksDbStore {
pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
@ -51,14 +53,28 @@ impl RocksDbStore {
let mut cfs = Vec::new();
// Bitmaps
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
cfs.push(ColumnFamilyDescriptor::new(CF_BITMAPS, cf_opts));
for subspace in [
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
let mut cf_opts = Options::default();
cf_opts.set_max_write_buffer_number(16);
cfs.push(ColumnFamilyDescriptor::new(
std::str::from_utf8(&[subspace]).unwrap(),
cf_opts,
));
}
// Counters
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
cfs.push(ColumnFamilyDescriptor::new(CF_COUNTERS, cf_opts));
for subspace in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
let mut cf_opts = Options::default();
cf_opts.set_merge_operator_associative("merge", numeric_value_merge);
cfs.push(ColumnFamilyDescriptor::new(
std::str::from_utf8(&[subspace]).unwrap(),
cf_opts,
));
}
// Blobs
let mut cf_opts = Options::default();
@ -71,9 +87,30 @@ impl RocksDbStore {
cfs.push(ColumnFamilyDescriptor::new(CF_BLOBS, cf_opts));
// Other cfs
for cf in [CF_INDEXES, CF_LOGS, CF_VALUES] {
for subspace in [
SUBSPACE_INDEXES,
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
SUBSPACE_LOOKUP_EXPIRY,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {
let cf_opts = Options::default();
cfs.push(ColumnFamilyDescriptor::new(cf, cf_opts));
cfs.push(ColumnFamilyDescriptor::new(
std::str::from_utf8(&[subspace]).unwrap(),
cf_opts,
));
}
let mut db_opts = Options::default();

View file

@ -23,25 +23,18 @@
use std::sync::Arc;
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use rocksdb::{BoundColumnFamily, MultiThreaded, OptimisticTransactionDB};
use crate::{
SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
SUBSPACE_VALUES,
};
use crate::{SUBSPACE_BLOBS, SUBSPACE_INDEXES, SUBSPACE_LOGS};
pub mod bitmap;
pub mod blob;
pub mod main;
pub mod read;
pub mod write;
static CF_BITMAPS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BITMAPS]) };
static CF_VALUES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_VALUES]) };
static CF_LOGS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_LOGS]) };
static CF_INDEXES: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_INDEXES]) };
static CF_BLOBS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_BLOBS]) };
static CF_COUNTERS: &str = unsafe { std::str::from_utf8_unchecked(&[SUBSPACE_COUNTERS]) };
impl From<rocksdb::Error> for crate::Error {
fn from(value: rocksdb::Error) -> Self {
@ -49,6 +42,18 @@ impl From<rocksdb::Error> for crate::Error {
}
}
pub(crate) trait CfHandle {
fn subspace_handle(&self, subspace: u8) -> Arc<BoundColumnFamily<'_>>;
}
impl CfHandle for OptimisticTransactionDB<MultiThreaded> {
#[inline(always)]
fn subspace_handle(&self, subspace: u8) -> Arc<BoundColumnFamily<'_>> {
self.cf_handle(unsafe { std::str::from_utf8_unchecked(&[subspace]) })
.unwrap()
}
}
pub struct RocksDbStore {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
worker_pool: rayon::ThreadPool,

View file

@ -24,13 +24,14 @@
use roaring::RoaringBitmap;
use rocksdb::{Direction, IteratorMode};
use super::RocksDbStore;
use crate::{
backend::rocksdb::CfHandle,
write::{key::DeserializeBigEndian, BitmapClass, ValueClass},
BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
};
use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS};
impl RocksDbStore {
pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
where
@ -62,12 +63,13 @@ impl RocksDbStore {
let db = self.db.clone();
self.spawn_worker(move || {
let mut bm = RoaringBitmap::new();
let subspace = key.subspace();
let begin = key.serialize(0);
key.document_id = u32::MAX;
let end = key.serialize(0);
let key_len = begin.len();
for row in db.iterator_cf(
&db.cf_handle(CF_BITMAPS).unwrap(),
&db.subspace_handle(subspace),
IteratorMode::From(&begin, Direction::Forward),
) {
let (key, _) = row?;
@ -92,9 +94,7 @@ impl RocksDbStore {
let db = self.db.clone();
self.spawn_worker(move || {
let cf = db
.cf_handle(std::str::from_utf8(&[params.begin.subspace()]).unwrap())
.unwrap();
let cf = db.subspace_handle(params.begin.subspace());
let begin = params.begin.serialize(0);
let end = params.end.serialize(0);
let it_mode = if params.ascending {
@ -123,10 +123,13 @@ impl RocksDbStore {
&self,
key: impl Into<ValueKey<ValueClass<u32>>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(0);
let key = key.into();
let db = self.db.clone();
self.spawn_worker(move || {
db.get_pinned_cf(&db.cf_handle(CF_COUNTERS).unwrap(), &key)
let cf = self.db.subspace_handle(key.subspace());
let key = key.serialize(0);
db.get_pinned_cf(&cf, &key)
.map_err(Into::into)
.and_then(|bytes| {
Ok(if let Some(bytes) = bytes {

View file

@ -34,14 +34,14 @@ use rocksdb::{
OptimisticTransactionOptions, WriteOptions,
};
use super::{RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES};
use super::{CfHandle, RocksDbStore, CF_INDEXES, CF_LOGS};
use crate::{
backend::deserialize_i64_le,
write::{
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
},
BitmapKey, Deserialize, IndexKey, Key, LogKey, SUBSPACE_COUNTERS, U32_LEN,
BitmapKey, Deserialize, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
};
impl RocksDbStore {
@ -51,11 +51,8 @@ impl RocksDbStore {
self.spawn_worker(move || {
let mut txn = RocksDBTransaction {
db: &db,
cf_bitmaps: db.cf_handle(CF_BITMAPS).unwrap(),
cf_values: db.cf_handle(CF_VALUES).unwrap(),
cf_indexes: db.cf_handle(CF_INDEXES).unwrap(),
cf_logs: db.cf_handle(CF_LOGS).unwrap(),
cf_counters: db.cf_handle(CF_COUNTERS).unwrap(),
txn_opts: OptimisticTransactionOptions::default(),
batch: &batch,
};
@ -121,32 +118,34 @@ impl RocksDbStore {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let db = self.db.clone();
self.spawn_worker(move || {
let cf = db
.cf_handle(std::str::from_utf8(&[SUBSPACE_COUNTERS]).unwrap())
.unwrap();
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
let cf = db
.cf_handle(std::str::from_utf8(&[subspace]).unwrap())
.unwrap();
let mut delete_keys = Vec::new();
let mut delete_keys = Vec::new();
for row in db.iterator_cf(&cf, IteratorMode::Start) {
let (key, value) = row?;
for row in db.iterator_cf(&cf, IteratorMode::Start) {
let (key, value) = row?;
if i64::deserialize(&value)? <= 0 {
delete_keys.push(key);
if i64::deserialize(&value)? == 0 {
delete_keys.push(key);
}
}
}
let txn_opts = OptimisticTransactionOptions::default();
for key in delete_keys {
let txn = db.transaction_opt(&WriteOptions::default(), &txn_opts);
if txn
.get_pinned_for_update_cf(&cf, &key, true)?
.map(|value| i64::deserialize(&value).map(|v| v == 0).unwrap_or(false))
.unwrap_or(false)
{
txn.delete(key)?;
txn.commit()?;
} else {
txn.rollback()?;
let txn_opts = OptimisticTransactionOptions::default();
for key in delete_keys {
let txn = db.transaction_opt(&WriteOptions::default(), &txn_opts);
if txn
.get_pinned_for_update_cf(&cf, &key, true)?
.map(|value| i64::deserialize(&value).map(|v| v == 0).unwrap_or(false))
.unwrap_or(false)
{
txn.delete_cf(&cf, key)?;
txn.commit()?;
} else {
txn.rollback()?;
}
}
}
@ -158,11 +157,8 @@ impl RocksDbStore {
struct RocksDBTransaction<'x> {
db: &'x OptimisticTransactionDB,
cf_bitmaps: Arc<BoundColumnFamily<'x>>,
cf_values: Arc<BoundColumnFamily<'x>>,
cf_indexes: Arc<BoundColumnFamily<'x>>,
cf_logs: Arc<BoundColumnFamily<'x>>,
cf_counters: Arc<BoundColumnFamily<'x>>,
txn_opts: OptimisticTransactionOptions,
batch: &'x Batch,
}
@ -203,18 +199,18 @@ impl<'x> RocksDBTransaction<'x> {
Operation::Value { class, op } => {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
let is_counter = class.is_counter(collection);
let cf = self.db.subspace_handle(class.subspace(collection));
match op {
ValueOp::Set(value) => {
txn.put_cf(&self.cf_values, &key, value.resolve(&result)?.as_ref())?;
txn.put_cf(&cf, &key, value.resolve(&result)?.as_ref())?;
}
ValueOp::AtomicAdd(by) => {
txn.merge_cf(&self.cf_counters, &key, &by.to_le_bytes()[..])?;
txn.merge_cf(&cf, &key, &by.to_le_bytes()[..])?;
}
ValueOp::AddAndGet(by) => {
let num = txn
.get_pinned_for_update_cf(&self.cf_counters, &key, true)
.get_pinned_for_update_cf(&cf, &key, true)
.map_err(CommitError::from)
.and_then(|bytes| {
if let Some(bytes) = bytes {
@ -225,18 +221,11 @@ impl<'x> RocksDBTransaction<'x> {
Ok(*by)
}
})?;
txn.put_cf(&self.cf_counters, &key, &num.to_le_bytes()[..])?;
txn.put_cf(&cf, &key, &num.to_le_bytes()[..])?;
result.push_counter_id(num);
}
ValueOp::Clear => {
txn.delete_cf(
if is_counter {
&self.cf_counters
} else {
&self.cf_values
},
&key,
)?;
txn.delete_cf(&cf, &key)?;
}
}
}
@ -258,6 +247,7 @@ impl<'x> RocksDBTransaction<'x> {
}
Operation::Bitmap { class, set } => {
let is_document_id = matches!(class, BitmapClass::DocumentIds);
let cf = self.db.subspace_handle(class.subspace());
if *set && is_document_id && document_id == u32::MAX {
let begin = BitmapKey {
account_id,
@ -276,10 +266,9 @@ impl<'x> RocksDBTransaction<'x> {
let key_len = begin.len();
let mut found_ids = RoaringBitmap::new();
for row in txn.iterator_cf(
&self.cf_bitmaps,
IteratorMode::From(&begin, Direction::Forward),
) {
for row in
txn.iterator_cf(&cf, IteratorMode::From(&begin, Direction::Forward))
{
let (key, _) = row?;
let key = key.as_ref();
if key.len() == key_len
@ -299,9 +288,9 @@ impl<'x> RocksDBTransaction<'x> {
class.serialize(account_id, collection, document_id, 0, (&result).into());
if *set {
txn.put_cf(&self.cf_bitmaps, &key, [])?;
txn.put_cf(&cf, &key, [])?;
} else {
txn.delete_cf(&self.cf_bitmaps, &key)?;
txn.delete_cf(&cf, &key)?;
}
}
Operation::Log { set } => {
@ -320,9 +309,10 @@ impl<'x> RocksDBTransaction<'x> {
} => {
let key =
class.serialize(account_id, collection, document_id, 0, (&result).into());
let cf = self.db.subspace_handle(class.subspace(collection));
let matches = txn
.get_pinned_for_update_cf(&self.cf_values, &key, true)?
.get_pinned_for_update_cf(&cf, &key, true)?
.map(|value| assert_value.matches(&value))
.unwrap_or_else(|| assert_value.is_none());

View file

@ -25,10 +25,7 @@ use r2d2::Pool;
use tokio::sync::oneshot;
use utils::config::{utils::AsKey, Config};
use crate::{
SUBSPACE_BITMAPS, SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS,
SUBSPACE_VALUES,
};
use crate::*;
use super::{pool::SqliteConnectionManager, SqliteStore};
@ -105,7 +102,24 @@ impl SqliteStore {
pub(super) fn create_tables(&self) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
SUBSPACE_LOOKUP_EXPIRY,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {
let table = char::from(table);
conn.execute(
&format!(
@ -118,7 +132,12 @@ impl SqliteStore {
)?;
}
for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
for table in [
SUBSPACE_INDEXES,
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
] {
let table = char::from(table);
conn.execute(
&format!(
@ -130,16 +149,18 @@ impl SqliteStore {
)?;
}
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
for table in [SUBSPACE_COUNTER, SUBSPACE_QUOTA] {
conn.execute(
&format!(
"CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY,
v INTEGER NOT NULL DEFAULT 0
)",
char::from(SUBSPACE_COUNTERS)
),
[],
)?;
char::from(table)
),
[],
)?;
}
Ok(())
}

View file

@ -63,10 +63,12 @@ impl SqliteStore {
let key_len = begin.len();
let end = key.serialize(0);
let conn = self.conn_pool.get()?;
let table = char::from(key.subspace());
self.spawn_worker(move || {
let mut bm = RoaringBitmap::new();
let mut query = conn.prepare_cached("SELECT k FROM b WHERE k >= ? AND k <= ?")?;
let mut query =
conn.prepare_cached(&format!("SELECT k FROM {table} WHERE k >= ? AND k <= ?"))?;
let mut rows = query.query([&begin, &end])?;
while let Some(row) = rows.next()? {
@ -139,11 +141,13 @@ impl SqliteStore {
&self,
key: impl Into<ValueKey<ValueClass<u32>>> + Sync + Send,
) -> crate::Result<i64> {
let key = key.into().serialize(0);
let key = key.into();
let table = char::from(key.subspace());
let key = key.serialize(0);
let conn = self.conn_pool.get()?;
self.spawn_worker(move || {
match conn
.prepare_cached("SELECT v FROM c WHERE k = ?")?
.prepare_cached(&format!("SELECT v FROM {table} WHERE k = ?"))?
.query_row([&key], |row| row.get::<_, i64>(0))
{
Ok(value) => Ok(value),

View file

@ -29,7 +29,7 @@ use crate::{
key::DeserializeBigEndian, AssignedIds, Batch, BitmapClass, Operation, RandomAvailableId,
ValueOp,
},
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTERS, U32_LEN,
BitmapKey, IndexKey, Key, LogKey, SUBSPACE_COUNTER, SUBSPACE_QUOTA, U32_LEN,
};
use super::SqliteStore;
@ -81,22 +81,30 @@ impl SqliteStore {
}
ValueOp::AtomicAdd(by) => {
if *by >= 0 {
trx.prepare_cached(concat!(
"INSERT INTO c (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + excluded.v"
trx.prepare_cached(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + excluded.v"
),
table
))?
.execute(params![&key, *by])?;
} else {
trx.prepare_cached("UPDATE c SET v = v + ? WHERE k = ?")?
.execute(params![*by, &key])?;
trx.prepare_cached(&format!(
"UPDATE {table} SET v = v + ? WHERE k = ?"
))?
.execute(params![*by, &key])?;
}
}
ValueOp::AddAndGet(by) => {
result.push_counter_id(
trx.prepare_cached(concat!(
"INSERT INTO c (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + ",
"excluded.v RETURNING v"
trx.prepare_cached(&format!(
concat!(
"INSERT INTO {} (k, v) VALUES (?, ?) ",
"ON CONFLICT(k) DO UPDATE SET v = v + ",
"excluded.v RETURNING v"
),
table
))?
.query_row(params![&key, &by], |row| row.get::<_, i64>(0))?,
);
@ -166,17 +174,21 @@ impl SqliteStore {
0,
(&result).into(),
);
let table = char::from(class.subspace());
if *set {
if is_document_id {
trx.prepare_cached("INSERT INTO b (k) VALUES (?)")?
.execute(params![&key])?;
} else {
trx.prepare_cached("INSERT OR IGNORE INTO b (k) VALUES (?)")?
.execute(params![&key])?;
trx.prepare_cached(&format!(
"INSERT OR IGNORE INTO {} (k) VALUES (?)",
table
))?
.execute(params![&key])?;
}
} else {
trx.prepare_cached("DELETE FROM b WHERE k = ?")?
trx.prepare_cached(&format!("DELETE FROM {} WHERE k = ?", table))?
.execute(params![&key])?;
};
}
@ -227,11 +239,10 @@ impl SqliteStore {
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
let conn = self.conn_pool.get()?;
self.spawn_worker(move || {
conn.prepare_cached(&format!(
"DELETE FROM {} WHERE v = 0",
char::from(SUBSPACE_COUNTERS),
))?
.execute([])?;
for subspace in [SUBSPACE_QUOTA, SUBSPACE_COUNTER] {
conn.prepare_cached(&format!("DELETE FROM {} WHERE v = 0", char::from(subspace),))?
.execute([])?;
}
Ok(())
})

View file

@ -292,7 +292,7 @@ impl LookupStore {
store
.iterate(IterateParams::new(from_key, to_key), |key, value| {
if value.deserialize_be_u64(0)? <= current_time {
expired_keys.push(key.get(1..).unwrap_or_default().to_vec());
expired_keys.push(key.to_vec());
}
Ok(true)
})
@ -327,7 +327,7 @@ impl LookupStore {
store
.iterate(IterateParams::new(from_key, to_key), |key, value| {
if value.deserialize_be_u64(0)? <= current_time {
expired_keys.push(key.get(1..).unwrap_or_default().to_vec());
expired_keys.push(key.to_vec());
}
Ok(true)
})

View file

@ -29,8 +29,8 @@ use crate::{
write::{
key::KeySerializer, now, AnyKey, AssignedIds, Batch, BitmapClass, ReportClass, ValueClass,
},
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAPS,
SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
};
#[cfg(feature = "test_mode")]
@ -303,7 +303,13 @@ impl Store {
}
pub async fn purge_account(&self, account_id: u32) -> crate::Result<()> {
for subspace in [SUBSPACE_BITMAPS, SUBSPACE_LOGS, SUBSPACE_INDEXES] {
for subspace in [
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
SUBSPACE_LOGS,
SUBSPACE_INDEXES,
] {
self.delete_range(
AnyKey {
subspace,
@ -396,15 +402,31 @@ impl Store {
#[cfg(feature = "test_mode")]
pub async fn destroy(&self) {
use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_VALUES};
use crate::*;
for subspace in [
SUBSPACE_VALUES,
SUBSPACE_LOGS,
SUBSPACE_BITMAPS,
SUBSPACE_ACL,
SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_INDEXES,
SUBSPACE_COUNTERS,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOGS,
SUBSPACE_LOOKUP_VALUE,
SUBSPACE_COUNTER,
SUBSPACE_LOOKUP_EXPIRY,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_BLOBS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_QUOTA,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
] {
self.delete_range(
AnyKey {
@ -464,7 +486,7 @@ impl Store {
self.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(1)?;
let account_id = key.deserialize_be_u32(0)?;
if account_id != last_account_id {
last_account_id = account_id;
batch.with_account_id(account_id);
@ -473,7 +495,7 @@ impl Store {
batch.ops.push(Operation::Value {
class: ValueClass::Blob(BlobOp::Reserve {
hash: BlobHash::try_from_hash_slice(
key.get(1 + U32_LEN..1 + U32_LEN + BLOB_HASH_LEN).unwrap(),
key.get(U32_LEN..U32_LEN + BLOB_HASH_LEN).unwrap(),
)
.unwrap(),
until: key.deserialize_be_u64(key.len() - U64_LEN)?,
@ -489,15 +511,88 @@ impl Store {
self.write(batch.build()).await.unwrap();
}
#[cfg(feature = "test_mode")]
pub async fn lookup_expire_all(&self) {
use crate::write::{
key::DeserializeBigEndian, BatchBuilder, LookupClass, Operation, ValueOp,
};
// Delete all temporary counters
let from_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![0u8])));
let to_key = ValueKey::from(ValueClass::Lookup(LookupClass::Key(vec![u8::MAX; 10])));
let mut expired_keys = Vec::new();
self.iterate(IterateParams::new(from_key, to_key), |key, value| {
if value.deserialize_be_u64(0)? != 0 {
expired_keys.push(key.to_vec());
}
Ok(true)
})
.await
.unwrap();
if !expired_keys.is_empty() {
let mut batch = BatchBuilder::new();
for key in expired_keys {
batch.ops.push(Operation::Value {
class: ValueClass::Lookup(LookupClass::Key(key)),
op: ValueOp::Clear,
});
if batch.ops.len() >= 1000 {
self.write(batch.build()).await.unwrap();
batch = BatchBuilder::new();
}
}
if !batch.ops.is_empty() {
self.write(batch.build()).await.unwrap();
}
}
// Delete expired counters
let from_key = ValueKey::from(ValueClass::Lookup(LookupClass::CounterExpiry(vec![0u8])));
let to_key = ValueKey::from(ValueClass::Lookup(LookupClass::CounterExpiry(vec![
u8::MAX;
10
])));
let mut expired_keys = Vec::new();
self.iterate(IterateParams::new(from_key, to_key), |key, _| {
expired_keys.push(key.to_vec());
Ok(true)
})
.await
.unwrap();
if !expired_keys.is_empty() {
let mut batch = BatchBuilder::new();
for key in expired_keys {
batch.ops.push(Operation::Value {
class: ValueClass::Lookup(LookupClass::Counter(key.clone())),
op: ValueOp::Clear,
});
batch.ops.push(Operation::Value {
class: ValueClass::Lookup(LookupClass::CounterExpiry(key)),
op: ValueOp::Clear,
});
if batch.ops.len() >= 1000 {
self.write(batch.build()).await.unwrap();
batch = BatchBuilder::new();
}
}
if !batch.ops.is_empty() {
self.write(batch.build()).await.unwrap();
}
}
}
#[cfg(feature = "test_mode")]
#[allow(unused_variables)]
pub async fn assert_is_empty(&self, blob_store: crate::BlobStore) {
use utils::codec::leb128::Leb128Iterator;
use crate::{SUBSPACE_BLOBS, SUBSPACE_COUNTERS, SUBSPACE_VALUES};
use crate::*;
self.blob_expire_all().await;
self.lookup_expire_all().await;
self.purge_blobs(blob_store).await.unwrap();
self.purge_store().await.unwrap();
@ -505,10 +600,27 @@ impl Store {
let mut failed = false;
for (subspace, with_values) in [
(SUBSPACE_VALUES, true),
(SUBSPACE_COUNTERS, false),
(SUBSPACE_ACL, true),
//(SUBSPACE_DIRECTORY, true),
(SUBSPACE_FTS_INDEX, true),
(SUBSPACE_LOOKUP_VALUE, true),
(SUBSPACE_LOOKUP_EXPIRY, true),
(SUBSPACE_PROPERTY, true),
(SUBSPACE_SETTINGS, true),
(SUBSPACE_QUEUE_MESSAGE, true),
(SUBSPACE_QUEUE_EVENT, true),
(SUBSPACE_REPORT_OUT, true),
(SUBSPACE_REPORT_IN, true),
(SUBSPACE_TERM_INDEX, true),
(SUBSPACE_BLOB_RESERVE, true),
(SUBSPACE_BLOB_LINK, true),
(SUBSPACE_BLOBS, true),
(SUBSPACE_BITMAPS, false),
(SUBSPACE_COUNTER, false),
(SUBSPACE_QUOTA, false),
(SUBSPACE_BLOBS, true),
(SUBSPACE_BITMAP_ID, false),
(SUBSPACE_BITMAP_TAG, false),
(SUBSPACE_BITMAP_TEXT, false),
(SUBSPACE_INDEXES, false),
] {
let from_key = crate::write::AnyKey {
@ -524,18 +636,11 @@ impl Store {
IterateParams::new(from_key, to_key).set_values(with_values),
|key, value| {
match subspace {
SUBSPACE_BITMAPS => {
SUBSPACE_BITMAP_ID | SUBSPACE_BITMAP_TAG | SUBSPACE_BITMAP_TEXT => {
if key.get(0..4).unwrap_or_default() == u32::MAX.to_be_bytes() {
return Ok(true);
}
#[cfg(feature = "rocks")]
if matches!(store, Self::RocksDb(_))
&& RoaringBitmap::deserialize(value).unwrap().is_empty()
{
return Ok(true);
}
const BM_DOCUMENT_IDS: u8 = 0;
const BM_TAG: u8 = 1 << 6;
const BM_TEXT: u8 = 1 << 7;
@ -586,18 +691,6 @@ impl Store {
value
);
}
SUBSPACE_VALUES
if [3, 9, 10].contains(&key[0])
|| (key[0] >= 20 && key[0] < 30)
|| key.get(1..5).unwrap_or_default() == u32::MAX.to_be_bytes() =>
{
// Ignore lastId counter and ID mappings
return Ok(true);
}
SUBSPACE_COUNTERS if key[0] == 9 || key.len() <= 4 => {
// Ignore named keys
return Ok(true);
}
SUBSPACE_INDEXES => {
println!(
concat!(

View file

@ -168,12 +168,33 @@ impl From<String> for Error {
}
}
pub const SUBSPACE_BITMAPS: u8 = b'b';
pub const SUBSPACE_VALUES: u8 = b'v';
pub const SUBSPACE_LOGS: u8 = b'l';
pub const SUBSPACE_ACL: u8 = b'a';
pub const SUBSPACE_BITMAP_ID: u8 = b'b';
pub const SUBSPACE_BITMAP_TAG: u8 = b'c';
pub const SUBSPACE_BITMAP_TEXT: u8 = b'v';
pub const SUBSPACE_DIRECTORY: u8 = b'd';
pub const SUBSPACE_FTS_INDEX: u8 = b'f';
pub const SUBSPACE_INDEXES: u8 = b'i';
pub const SUBSPACE_BLOB_RESERVE: u8 = b'j';
pub const SUBSPACE_BLOB_LINK: u8 = b'k';
pub const SUBSPACE_LOGS: u8 = b'l';
pub const SUBSPACE_COUNTER: u8 = b'n';
pub const SUBSPACE_LOOKUP_VALUE: u8 = b'm';
pub const SUBSPACE_LOOKUP_EXPIRY: u8 = b'o';
pub const SUBSPACE_PROPERTY: u8 = b'p';
pub const SUBSPACE_SETTINGS: u8 = b's';
pub const SUBSPACE_BLOBS: u8 = b't';
pub const SUBSPACE_COUNTERS: u8 = b'c';
pub const SUBSPACE_QUEUE_MESSAGE: u8 = b'e';
pub const SUBSPACE_QUEUE_EVENT: u8 = b'q';
pub const SUBSPACE_QUOTA: u8 = b'u';
pub const SUBSPACE_REPORT_OUT: u8 = b'h';
pub const SUBSPACE_REPORT_IN: u8 = b'r';
pub const SUBSPACE_TERM_INDEX: u8 = b'g';
pub const SUBSPACE_RESERVED_1: u8 = b'w';
pub const SUBSPACE_RESERVED_2: u8 = b'x';
pub const SUBSPACE_RESERVED_3: u8 = b'y';
pub const SUBSPACE_RESERVED_4: u8 = b'z';
pub struct IterateParams<T: Key> {
begin: T,

View file

@ -112,9 +112,9 @@ impl Store {
self.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
if account_id == key.deserialize_be_u32(U32_LEN + 1)? {
if account_id == key.deserialize_be_u32(U32_LEN)? {
delete_keys.push((
ValueClass::Acl(key.deserialize_be_u32(1)?),
ValueClass::Acl(key.deserialize_be_u32(0)?),
AclItem::deserialize(key)?,
));
}
@ -156,11 +156,11 @@ impl Store {
impl Deserialize for AclItem {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
Ok(AclItem {
to_account_id: bytes.deserialize_be_u32(U32_LEN + 1)?,
to_account_id: bytes.deserialize_be_u32(U32_LEN)?,
to_collection: *bytes
.get((U32_LEN * 2) + 1)
.get(U32_LEN * 2)
.ok_or_else(|| Error::InternalError(format!("Corrupted acl key {bytes:?}")))?,
to_document_id: bytes.deserialize_be_u32((U32_LEN * 2) + 2)?,
to_document_id: bytes.deserialize_be_u32((U32_LEN * 2) + 1)?,
permissions: 0,
})
}

View file

@ -159,17 +159,16 @@ impl Store {
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let hash = BlobHash::try_from_hash_slice(
key.get(1 + U32_LEN..1 + U32_LEN + BLOB_HASH_LEN)
.ok_or_else(|| {
crate::Error::InternalError(format!(
"Invalid key {key:?} in blob hash tables"
))
})?,
key.get(U32_LEN..U32_LEN + BLOB_HASH_LEN).ok_or_else(|| {
crate::Error::InternalError(format!(
"Invalid key {key:?} in blob hash tables"
))
})?,
)
.unwrap();
let until = key.deserialize_be_u64(key.len() - U64_LEN)?;
if until <= now {
delete_keys.push((key.deserialize_be_u32(1)?, BlobOp::Reserve { until, hash }));
delete_keys.push((key.deserialize_be_u32(0)?, BlobOp::Reserve { until, hash }));
} else {
active_hashes.insert(hash);
}
@ -199,14 +198,13 @@ impl Store {
self.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let hash = BlobHash::try_from_hash_slice(
key.get(1..1 + BLOB_HASH_LEN).ok_or_else(|| {
let hash =
BlobHash::try_from_hash_slice(key.get(0..BLOB_HASH_LEN).ok_or_else(|| {
crate::Error::InternalError(format!(
"Invalid key {key:?} in blob hash tables"
))
})?,
)
.unwrap();
})?)
.unwrap();
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if document_id != u32::MAX {
@ -279,15 +277,13 @@ impl Store {
|key, _| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if document_id != u32::MAX
&& key.deserialize_be_u32(1 + BLOB_HASH_LEN)? == account_id
{
if document_id != u32::MAX && key.deserialize_be_u32(BLOB_HASH_LEN)? == account_id {
delete_keys.push((
key[1 + BLOB_HASH_LEN + U32_LEN],
key[BLOB_HASH_LEN + U32_LEN],
document_id,
BlobOp::Link {
hash: BlobHash::try_from_hash_slice(
key.get(1..1 + BLOB_HASH_LEN).ok_or_else(|| {
key.get(0..BLOB_HASH_LEN).ok_or_else(|| {
crate::Error::InternalError(format!(
"Invalid key {key:?} in blob hash tables"
))

View file

@ -25,9 +25,12 @@ use std::convert::TryInto;
use utils::{codec::leb128::Leb128_, BLOB_HASH_LEN};
use crate::{
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_BITMAPS,
SUBSPACE_COUNTERS, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_VALUES, U32_LEN, U64_LEN,
WITH_SUBSPACE,
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_ACL,
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_BLOB_LINK,
SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX,
SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_EXPIRY, SUBSPACE_LOOKUP_VALUE,
SUBSPACE_PROPERTY, SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA,
SUBSPACE_REPORT_OUT, SUBSPACE_SETTINGS, SUBSPACE_TERM_INDEX, U32_LEN, U64_LEN, WITH_SUBSPACE,
};
use super::{
@ -255,119 +258,110 @@ impl<T: ResolveId> ValueClass<T> {
match self {
ValueClass::Property(field) => serializer
.write(0u8)
.write(account_id)
.write(collection)
.write(*field)
.write(document_id),
ValueClass::TermIndex => serializer
.write(1u8)
.write(account_id)
.write(collection)
.write_leb128(document_id),
ValueClass::Acl(grant_account_id) => serializer
.write(2u8)
.write(*grant_account_id)
.write(account_id)
.write(collection)
.write(document_id),
ValueClass::IndexEmail(seq) => serializer
.write(5u8)
.write(*seq)
.write(account_id)
.write(document_id),
ValueClass::IndexEmail(seq) => {
serializer.write(*seq).write(account_id).write(document_id)
}
ValueClass::Blob(op) => match op {
BlobOp::Reserve { hash, until } => serializer
.write(6u8)
.write(account_id)
.write::<&[u8]>(hash.as_ref())
.write(*until),
BlobOp::Commit { hash } => serializer
.write(7u8)
.write::<&[u8]>(hash.as_ref())
.write(u32::MAX)
.write(0u8)
.write(u32::MAX),
BlobOp::Link { hash } => serializer
.write(7u8)
.write::<&[u8]>(hash.as_ref())
.write(account_id)
.write(collection)
.write(document_id),
},
ValueClass::Config(key) => serializer.write(8u8).write(key.as_slice()),
ValueClass::Config(key) => serializer.write(key.as_slice()),
ValueClass::Lookup(lookup) => match lookup {
LookupClass::Key(key) => serializer.write(4u8).write(key.as_slice()),
LookupClass::Counter(key) => serializer.write(9u8).write(key.as_slice()),
LookupClass::CounterExpiry(key) => serializer.write(10u8).write(key.as_slice()),
LookupClass::Key(key) => serializer.write(key.as_slice()),
LookupClass::Counter(key) => serializer.write(key.as_slice()),
LookupClass::CounterExpiry(key) => serializer.write(key.as_slice()),
},
ValueClass::Directory(directory) => match directory {
DirectoryClass::NameToId(name) => serializer.write(20u8).write(name.as_slice()),
DirectoryClass::EmailToId(email) => serializer.write(21u8).write(email.as_slice()),
DirectoryClass::NameToId(name) => serializer.write(0u8).write(name.as_slice()),
DirectoryClass::EmailToId(email) => serializer.write(1u8).write(email.as_slice()),
DirectoryClass::Principal(uid) => serializer
.write(22u8)
.write(2u8)
.write_leb128(uid.resolve_id(assigned_ids)),
DirectoryClass::Domain(name) => serializer.write(23u8).write(name.as_slice()),
DirectoryClass::UsedQuota(uid) => serializer.write(24u8).write_leb128(*uid),
DirectoryClass::Domain(name) => serializer.write(3u8).write(name.as_slice()),
DirectoryClass::UsedQuota(uid) => serializer.write(4u8).write_leb128(*uid),
DirectoryClass::MemberOf {
principal_id,
member_of,
} => serializer
.write(25u8)
.write(5u8)
.write(principal_id.resolve_id(assigned_ids))
.write(member_of.resolve_id(assigned_ids)),
DirectoryClass::Members {
principal_id,
has_member,
} => serializer
.write(26u8)
.write(6u8)
.write(principal_id.resolve_id(assigned_ids))
.write(has_member.resolve_id(assigned_ids)),
},
ValueClass::Queue(queue) => match queue {
QueueClass::Message(queue_id) => serializer.write(50u8).write(*queue_id),
QueueClass::MessageEvent(event) => serializer
.write(51u8)
.write(event.due)
.write(event.queue_id),
QueueClass::Message(queue_id) => serializer.write(*queue_id),
QueueClass::MessageEvent(event) => {
serializer.write(event.due).write(event.queue_id)
}
QueueClass::DmarcReportHeader(event) => serializer
.write(52u8)
.write(0u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id)
.write(0u8),
QueueClass::TlsReportHeader(event) => serializer
.write(52u8)
.write(0u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id)
.write(1u8),
QueueClass::DmarcReportEvent(event) => serializer
.write(53u8)
.write(1u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id),
QueueClass::TlsReportEvent(event) => serializer
.write(54u8)
.write(2u8)
.write(event.due)
.write(event.domain.as_bytes())
.write(event.policy_hash)
.write(event.seq_id),
QueueClass::QuotaCount(key) => serializer.write(55u8).write(key.as_slice()),
QueueClass::QuotaSize(key) => serializer.write(56u8).write(key.as_slice()),
QueueClass::QuotaCount(key) => serializer.write(0u8).write(key.as_slice()),
QueueClass::QuotaSize(key) => serializer.write(1u8).write(key.as_slice()),
},
ValueClass::Report(report) => match report {
ReportClass::Tls { id, expires } => {
serializer.write(60u8).write(*expires).write(*id)
serializer.write(0u8).write(*expires).write(*id)
}
ReportClass::Dmarc { id, expires } => {
serializer.write(61u8).write(*expires).write(*id)
serializer.write(1u8).write(*expires).write(*id)
}
ReportClass::Arf { id, expires } => {
serializer.write(62u8).write(*expires).write(*id)
serializer.write(2u8).write(*expires).write(*id)
}
},
}
@ -401,7 +395,7 @@ impl<T: AsRef<[u8]> + Sync + Send> Key for IndexKey<T> {
impl<T: AsRef<BitmapClass<u32>> + Sync + Send> Key for BitmapKey<T> {
fn subspace(&self) -> u8 {
SUBSPACE_BITMAPS
self.class.as_ref().subspace()
}
fn serialize(&self, flags: u32) -> Vec<u8> {
@ -416,6 +410,14 @@ impl<T: AsRef<BitmapClass<u32>> + Sync + Send> Key for BitmapKey<T> {
}
impl<T: ResolveId> BitmapClass<T> {
pub fn subspace(&self) -> u8 {
match self {
BitmapClass::DocumentIds => SUBSPACE_BITMAP_ID,
BitmapClass::Tag { .. } => SUBSPACE_BITMAP_TAG,
BitmapClass::Text { .. } => SUBSPACE_BITMAP_TEXT,
}
}
pub fn serialize(
&self,
account_id: u32,
@ -424,65 +426,67 @@ impl<T: ResolveId> BitmapClass<T> {
flags: u32,
assigned_ids: Option<&AssignedIds>,
) -> Vec<u8> {
const BM_DOCUMENT_IDS: u8 = 0;
const BM_TAG: u8 = 1 << 6;
const BM_TEXT: u8 = 1 << 7;
const TAG_ID: u8 = 0;
const TAG_TEXT: u8 = 1 << 0;
const TAG_STATIC: u8 = 1 << 1;
const BM_MARKER: u8 = 1 << 7;
match self {
BitmapClass::DocumentIds => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 3).write(SUBSPACE_BITMAPS)
KeySerializer::new(U32_LEN + 2).write(SUBSPACE_BITMAP_ID)
} else {
KeySerializer::new(U32_LEN + 2)
KeySerializer::new(U32_LEN + 1)
}
.write(account_id)
.write(collection)
.write(BM_DOCUMENT_IDS),
.write(collection),
BitmapClass::Tag { field, value } => match value {
TagValue::Id(id) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new((U32_LEN * 2) + 4).write(SUBSPACE_BITMAPS)
KeySerializer::new((U32_LEN * 2) + 4).write(SUBSPACE_BITMAP_TAG)
} else {
KeySerializer::new((U32_LEN * 2) + 3)
}
.write(account_id)
.write(collection)
.write(BM_TAG | TAG_ID)
.write(collection | BM_MARKER)
.write(*field)
.write_leb128(id.resolve_id(assigned_ids)),
TagValue::Text(text) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 4 + text.len()).write(SUBSPACE_BITMAPS)
} else {
KeySerializer::new(U32_LEN + 3 + text.len())
}
.write(account_id)
.write(collection)
.write(BM_TAG | TAG_TEXT)
.write(*field)
.write(text.as_slice()),
TagValue::Static(id) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 5).write(SUBSPACE_BITMAPS)
KeySerializer::new(U32_LEN + 5).write(SUBSPACE_BITMAP_TAG)
} else {
KeySerializer::new(U32_LEN + 4)
}
.write(account_id)
.write(collection)
.write(BM_TAG | TAG_STATIC)
.write(*field)
.write(*id),
TagValue::Text(text) => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 4 + text.len()).write(SUBSPACE_BITMAP_TAG)
} else {
KeySerializer::new(U32_LEN + 3 + text.len())
}
.write(account_id)
.write(collection)
.write(*field)
.write(text.as_slice()),
},
BitmapClass::Text { field, token } => if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 16 + 3 + 1).write(SUBSPACE_BITMAPS)
} else {
KeySerializer::new(U32_LEN + 16 + 3)
BitmapClass::Text { field, token } => {
let serializer = if (flags & WITH_SUBSPACE) != 0 {
KeySerializer::new(U32_LEN + 16 + 3 + 1).write(SUBSPACE_BITMAP_TEXT)
} else {
KeySerializer::new(U32_LEN + 16 + 3)
}
.write(account_id)
.write(
token
.hash
.get(0..std::cmp::min(token.len as usize, 8))
.unwrap(),
);
if token.len >= 8 {
serializer.write(token.len)
} else {
serializer
}
.write(collection)
.write(*field)
}
.write(account_id)
.write(collection)
.write(BM_TEXT | token.len)
.write(*field)
.write(token.hash.as_slice()),
}
.write(document_id)
.finalize()
@ -543,10 +547,41 @@ impl<T> ValueClass<T> {
}
pub fn subspace(&self, collection: u8) -> u8 {
if self.is_counter(collection) {
SUBSPACE_COUNTERS
} else {
SUBSPACE_VALUES
match self {
ValueClass::Property(field) => {
if *field == 84 && collection == 1 {
SUBSPACE_COUNTER
} else {
SUBSPACE_PROPERTY
}
}
ValueClass::TermIndex => SUBSPACE_TERM_INDEX,
ValueClass::Acl(_) => SUBSPACE_ACL,
ValueClass::IndexEmail(_) => SUBSPACE_FTS_INDEX,
ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE,
BlobOp::Commit { .. } | BlobOp::Link { .. } => SUBSPACE_BLOB_LINK,
},
ValueClass::Config(_) => SUBSPACE_SETTINGS,
ValueClass::Lookup(lookup) => match lookup {
LookupClass::Key(_) => SUBSPACE_LOOKUP_VALUE,
LookupClass::Counter(_) => SUBSPACE_COUNTER,
LookupClass::CounterExpiry(_) => SUBSPACE_LOOKUP_EXPIRY,
},
ValueClass::Directory(directory) => match directory {
DirectoryClass::UsedQuota(_) => SUBSPACE_QUOTA,
_ => SUBSPACE_DIRECTORY,
},
ValueClass::Queue(queue) => match queue {
QueueClass::Message(_) => SUBSPACE_QUEUE_MESSAGE,
QueueClass::MessageEvent(_) => SUBSPACE_QUEUE_EVENT,
QueueClass::DmarcReportHeader(_)
| QueueClass::TlsReportHeader(_)
| QueueClass::DmarcReportEvent(_)
| QueueClass::TlsReportEvent(_) => SUBSPACE_REPORT_OUT,
QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_) => SUBSPACE_QUOTA,
},
ValueClass::Report(_) => SUBSPACE_REPORT_OUT,
}
}

View file

@ -140,7 +140,10 @@ pub async fn query(client: &mut Client) {
(email::query::Filter::in_mailbox(Id::new(1768u64).to_string())),
(email::query::Filter::cc("canvas")),
]),
vec![email::query::Comparator::from()],
vec![
email::query::Comparator::from(),
email::query::Comparator::sent_at(),
],
vec!["T01882", "N04689", "T00925", "N00121"],
),
(
@ -156,7 +159,10 @@ pub async fn query(client: &mut Client) {
Id::new(1963).to_string(),
])),
]),
vec![email::query::Comparator::subject()],
vec![
email::query::Comparator::subject(),
email::query::Comparator::sent_at(),
],
vec![
"T10330", "N01744", "N01743", "N04885", "N02688", "N02122", "A00059", "A00058",
"N02123", "T00651", "T09439", "N05001", "T05848", "T05508",
@ -168,7 +174,10 @@ pub async fn query(client: &mut Client) {
Filter::not(vec![(email::query::Filter::from("collins"))]),
(email::query::Filter::body("bequeathed")).into(),
]),
vec![email::query::Comparator::subject()],
vec![
email::query::Comparator::subject(),
email::query::Comparator::sent_at(),
],
vec![
"N02640", "A01020", "N01250", "T03430", "N01800", "N00620", "N05250", "N04630",
"A01040",
@ -176,7 +185,10 @@ pub async fn query(client: &mut Client) {
),
(
email::query::Filter::not_keyword("artist").into(),
vec![email::query::Comparator::subject()],
vec![
email::query::Comparator::subject(),
email::query::Comparator::sent_at(),
],
vec!["T08626", "T09334", "T09455", "N01737", "T10965"],
),
(
@ -185,7 +197,10 @@ pub async fn query(client: &mut Client) {
(email::query::Filter::before(1972)),
(email::query::Filter::text("colour")),
]),
vec![email::query::Comparator::from()],
vec![
email::query::Comparator::from(),
email::query::Comparator::sent_at(),
],
vec!["T01745", "P01436", "P01437"],
),
(
@ -210,7 +225,10 @@ pub async fn query(client: &mut Client) {
(email::query::Filter::all_in_thread_have_keyword("N")),
(email::query::Filter::before(1800)),
]),
vec![email::query::Comparator::from()],
vec![
email::query::Comparator::from(),
email::query::Comparator::sent_at(),
],
vec![
"N01496", "N05916", "N01046", "N00675", "N01320", "N01321", "N00273", "N01453",
"N02984",
@ -235,7 +253,10 @@ pub async fn query(client: &mut Client) {
(email::query::Filter::some_in_thread_have_keyword("Bronze")),
(email::query::Filter::before(1878)),
]),
vec![email::query::Comparator::from()],
vec![
email::query::Comparator::from(),
email::query::Comparator::sent_at(),
],
vec![
"N04326", "N01610", "N02920", "N01587", "T00167", "T00168", "N01554", "N01535",
"N01536", "N01622", "N01754", "N01594",

View file

@ -178,8 +178,8 @@ impl QueueReceiver {
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
events.push(QueueEvent {
due: key.deserialize_be_u64(1)?,
queue_id: key.deserialize_be_u64(U64_LEN + 1)?,
due: key.deserialize_be_u64(0)?,
queue_id: key.deserialize_be_u64(U64_LEN)?,
});
Ok(true)
},
@ -200,7 +200,7 @@ impl QueueReceiver {
IterateParams::new(from_key, to_key).descending(),
|key, value| {
let value = Bincode::<Message>::deserialize(value)?;
assert_eq!(key.deserialize_be_u64(1)?, value.inner.id);
assert_eq!(key.deserialize_be_u64(0)?, value.inner.id);
messages.push(value.inner);
Ok(true)
},