mirror of
https://github.com/stalwartlabs/mail-server.git
synced 2026-01-06 23:15:31 +08:00
Store RoaringBitmaps in PostgreSQL
This commit is contained in:
parent
f989f4f750
commit
fafb1f3ff0
7 changed files with 747 additions and 94 deletions
|
|
@ -74,7 +74,12 @@ impl PostgresStore {
|
|||
pub(super) async fn create_tables(&self) -> crate::Result<()> {
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] {
|
||||
for table in [
|
||||
SUBSPACE_VALUES,
|
||||
SUBSPACE_LOGS,
|
||||
SUBSPACE_BLOBS,
|
||||
SUBSPACE_BITMAPS,
|
||||
] {
|
||||
let table = char::from(table);
|
||||
conn.execute(
|
||||
&format!(
|
||||
|
|
@ -88,7 +93,7 @@ impl PostgresStore {
|
|||
.await?;
|
||||
}
|
||||
|
||||
for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] {
|
||||
for table in [SUBSPACE_INDEXES] {
|
||||
let table = char::from(table);
|
||||
conn.execute(
|
||||
&format!(
|
||||
|
|
|
|||
|
|
@ -25,8 +25,9 @@ use futures::{pin_mut, TryStreamExt};
|
|||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::{
|
||||
write::{key::DeserializeBigEndian, BitmapClass, ValueClass},
|
||||
BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
|
||||
backend::rocksdb::bitmap::deserialize_bitmap,
|
||||
write::{BitmapClass, ValueClass},
|
||||
BitmapKey, Deserialize, IterateParams, Key, ValueKey, WITHOUT_BLOCK_NUM,
|
||||
};
|
||||
|
||||
use super::PostgresStore;
|
||||
|
|
@ -58,29 +59,26 @@ impl PostgresStore {
|
|||
|
||||
pub(crate) async fn get_bitmap(
|
||||
&self,
|
||||
mut key: BitmapKey<BitmapClass>,
|
||||
key: BitmapKey<BitmapClass>,
|
||||
) -> crate::Result<Option<RoaringBitmap>> {
|
||||
let begin = key.serialize(0);
|
||||
key.block_num = u32::MAX;
|
||||
let key_len = begin.len();
|
||||
let end = key.serialize(0);
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
let mut bm = RoaringBitmap::new();
|
||||
let s = conn
|
||||
.prepare_cached("SELECT k FROM b WHERE k >= $1 AND k <= $2")
|
||||
.await?;
|
||||
let rows = conn.query_raw(&s, &[&begin, &end]).await?;
|
||||
|
||||
pin_mut!(rows);
|
||||
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
let key: &[u8] = row.try_get(0)?;
|
||||
if key.len() == key_len {
|
||||
bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?);
|
||||
}
|
||||
}
|
||||
Ok(if !bm.is_empty() { Some(bm) } else { None })
|
||||
let s = conn.prepare_cached("SELECT v FROM b WHERE k = $1").await?;
|
||||
let key = key.serialize(WITHOUT_BLOCK_NUM);
|
||||
conn.query_opt(&s, &[&key])
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.and_then(|r| {
|
||||
if let Some(r) = r {
|
||||
let bm = deserialize_bitmap(r.get(0))?;
|
||||
if !bm.is_empty() {
|
||||
Ok(Some(bm))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn iterate<T: Key>(
|
||||
|
|
|
|||
155
crates/store/src/backend/postgres/read_dense.rs
Normal file
155
crates/store/src/backend/postgres/read_dense.rs
Normal file
|
|
@ -0,0 +1,155 @@
|
|||
/*
|
||||
* Copyright (c) 2023 Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use futures::{pin_mut, TryStreamExt};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::{
|
||||
write::{bitmap::DeserializeBlock, key::DeserializeBigEndian, BitmapClass, ValueClass},
|
||||
BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN,
|
||||
};
|
||||
|
||||
use super::PostgresStore;
|
||||
|
||||
impl PostgresStore {
|
||||
pub(crate) async fn get_value<U>(&self, key: impl Key) -> crate::Result<Option<U>>
|
||||
where
|
||||
U: Deserialize + 'static,
|
||||
{
|
||||
let conn = self.conn_pool.get().await?;
|
||||
let s = conn
|
||||
.prepare_cached(&format!(
|
||||
"SELECT v FROM {} WHERE k = $1",
|
||||
char::from(key.subspace())
|
||||
))
|
||||
.await?;
|
||||
let key = key.serialize(0);
|
||||
conn.query_opt(&s, &[&key])
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
.and_then(|r| {
|
||||
if let Some(r) = r {
|
||||
Ok(Some(U::deserialize(r.get(0))?))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn get_bitmap(
|
||||
&self,
|
||||
mut key: BitmapKey<BitmapClass>,
|
||||
) -> crate::Result<Option<RoaringBitmap>> {
|
||||
let begin = key.serialize(0);
|
||||
key.block_num = u32::MAX;
|
||||
let key_len = begin.len();
|
||||
let end = key.serialize(0);
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
let mut bm = RoaringBitmap::new();
|
||||
let s = conn
|
||||
.prepare_cached("SELECT k, v FROM b WHERE k >= $1 AND k <= $2")
|
||||
.await?;
|
||||
let rows = conn.query_raw(&s, &[&begin, &end]).await?;
|
||||
|
||||
pin_mut!(rows);
|
||||
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
let key: &[u8] = row.try_get(0)?;
|
||||
if key.len() == key_len {
|
||||
let value: &[u8] = row.try_get(0)?;
|
||||
bm.deserialize_block(value, key.deserialize_be_u32(key.len() - U32_LEN)?);
|
||||
}
|
||||
}
|
||||
Ok(if !bm.is_empty() { Some(bm) } else { None })
|
||||
}
|
||||
|
||||
pub(crate) async fn iterate<T: Key>(
|
||||
&self,
|
||||
params: IterateParams<T>,
|
||||
mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result<bool> + Sync + Send,
|
||||
) -> crate::Result<()> {
|
||||
let conn = self.conn_pool.get().await?;
|
||||
let table = char::from(params.begin.subspace());
|
||||
let begin = params.begin.serialize(0);
|
||||
let end = params.end.serialize(0);
|
||||
let keys = if params.values { "k, v" } else { "k" };
|
||||
|
||||
let s = conn
|
||||
.prepare_cached(&match (params.first, params.ascending) {
|
||||
(true, true) => {
|
||||
format!(
|
||||
"SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k ASC LIMIT 1"
|
||||
)
|
||||
}
|
||||
(true, false) => {
|
||||
format!(
|
||||
"SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k DESC LIMIT 1"
|
||||
)
|
||||
}
|
||||
(false, true) => {
|
||||
format!("SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k ASC")
|
||||
}
|
||||
(false, false) => {
|
||||
format!("SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k DESC")
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
let rows = conn.query_raw(&s, &[&begin, &end]).await?;
|
||||
|
||||
pin_mut!(rows);
|
||||
|
||||
if params.values {
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
let key = row.try_get::<_, &[u8]>(0)?;
|
||||
let value = row.try_get::<_, &[u8]>(1)?;
|
||||
|
||||
if !cb(key, value)? {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
while let Some(row) = rows.try_next().await? {
|
||||
if !cb(row.try_get::<_, &[u8]>(0)?, b"")? {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn get_counter(
|
||||
&self,
|
||||
key: impl Into<ValueKey<ValueClass>> + Sync + Send,
|
||||
) -> crate::Result<i64> {
|
||||
let key = key.into().serialize(0);
|
||||
let conn = self.conn_pool.get().await?;
|
||||
let s = conn.prepare_cached("SELECT v FROM c WHERE k = $1").await?;
|
||||
match conn.query_opt(&s, &[&key]).await {
|
||||
Ok(Some(row)) => row.try_get(0).map_err(Into::into),
|
||||
Ok(None) => Ok(0),
|
||||
Err(e) => Err(e.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -21,18 +21,21 @@
|
|||
* for more details.
|
||||
*/
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use ahash::AHashMap;
|
||||
use deadpool_postgres::Object;
|
||||
use rand::Rng;
|
||||
use roaring::RoaringBitmap;
|
||||
use tokio_postgres::{error::SqlState, IsolationLevel};
|
||||
|
||||
use crate::{
|
||||
write::{
|
||||
Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS,
|
||||
backend::rocksdb::bitmap::deserialize_bitmap,
|
||||
write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME},
|
||||
BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS, WITHOUT_BLOCK_NUM,
|
||||
};
|
||||
|
||||
use super::PostgresStore;
|
||||
|
|
@ -54,7 +57,9 @@ impl PostgresStore {
|
|||
}
|
||||
Err(err) => match err.code() {
|
||||
Some(
|
||||
&SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_DEADLOCK_DETECTED,
|
||||
&SqlState::T_R_SERIALIZATION_FAILURE
|
||||
| &SqlState::T_R_DEADLOCK_DETECTED
|
||||
| &SqlState::UNIQUE_VIOLATION,
|
||||
) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => {
|
||||
let backoff = rand::thread_rng().gen_range(50..=300);
|
||||
tokio::time::sleep(Duration::from_millis(backoff)).await;
|
||||
|
|
@ -84,6 +89,141 @@ impl PostgresStore {
|
|||
.start()
|
||||
.await?;
|
||||
|
||||
// Sort the operations by key to avoid deadlocks
|
||||
let mut assert_values = BTreeMap::new();
|
||||
let mut bitmap_updates = BTreeMap::new();
|
||||
let mut advisory_locks = BTreeSet::new();
|
||||
for op in &batch.ops {
|
||||
match op {
|
||||
Operation::AccountId {
|
||||
account_id: account_id_,
|
||||
} => {
|
||||
account_id = *account_id_;
|
||||
}
|
||||
Operation::Collection {
|
||||
collection: collection_,
|
||||
} => {
|
||||
collection = *collection_;
|
||||
}
|
||||
Operation::DocumentId {
|
||||
document_id: document_id_,
|
||||
} => {
|
||||
document_id = *document_id_;
|
||||
}
|
||||
Operation::AssertValue {
|
||||
class,
|
||||
assert_value,
|
||||
} => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
};
|
||||
let table = char::from(key.subspace());
|
||||
assert_values.insert(key.serialize(0), (table, assert_value));
|
||||
if account_id != u32::MAX {
|
||||
advisory_locks.insert((account_id as u64) << 32 | collection as u64);
|
||||
}
|
||||
}
|
||||
Operation::Bitmap { class, set } => {
|
||||
bitmap_updates
|
||||
.entry(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class,
|
||||
block_num: 0,
|
||||
}
|
||||
.serialize(WITHOUT_BLOCK_NUM),
|
||||
)
|
||||
.or_insert_with(Vec::new)
|
||||
.push((*set, document_id));
|
||||
if account_id != u32::MAX {
|
||||
advisory_locks.insert((account_id as u64) << 32 | collection as u64);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Acquire advisory locks
|
||||
for lock in advisory_locks {
|
||||
trx.execute("SELECT pg_advisory_xact_lock($1)", &[&(lock as i64)])
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Assert values
|
||||
for (key, (table, assert_value)) in assert_values {
|
||||
let s = trx
|
||||
.prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table))
|
||||
.await?;
|
||||
let (exists, matches) = trx
|
||||
.query_opt(&s, &[&key])
|
||||
.await?
|
||||
.map(|row| {
|
||||
row.try_get::<_, &[u8]>(0)
|
||||
.map_or((true, false), |v| (true, assert_value.matches(v)))
|
||||
})
|
||||
.unwrap_or_else(|| (false, assert_value.is_none()));
|
||||
if !matches {
|
||||
return Ok(false);
|
||||
}
|
||||
asserted_values.insert(key, exists);
|
||||
}
|
||||
|
||||
// Update bitmaps
|
||||
for (key, changes) in bitmap_updates {
|
||||
let s = trx
|
||||
.prepare_cached("SELECT v FROM b WHERE k = $1 FOR UPDATE")
|
||||
.await?;
|
||||
let (value_exists, mut bm) = match trx
|
||||
.query_opt(&s, &[&key])
|
||||
.await?
|
||||
.map(|r| deserialize_bitmap(r.get(0)))
|
||||
{
|
||||
Some(Ok(bm)) => (true, bm),
|
||||
None => (false, RoaringBitmap::new()),
|
||||
Some(Err(e)) => {
|
||||
tracing::error!("Failed to deserialize bitmap: {:?}", e);
|
||||
return Ok(false);
|
||||
}
|
||||
};
|
||||
|
||||
let mut has_changes = false;
|
||||
for (set, document_id) in changes {
|
||||
if set {
|
||||
if bm.insert(document_id) {
|
||||
has_changes = true;
|
||||
}
|
||||
} else if bm.remove(document_id) {
|
||||
has_changes = true;
|
||||
}
|
||||
}
|
||||
|
||||
if has_changes {
|
||||
if !bm.is_empty() {
|
||||
let mut bytes = Vec::with_capacity(bm.serialized_size() + 1);
|
||||
let _ = bm.serialize_into(&mut bytes);
|
||||
let s = if value_exists {
|
||||
trx.prepare_cached("UPDATE b SET v = $2 WHERE k = $1")
|
||||
.await?
|
||||
} else {
|
||||
trx.prepare_cached("INSERT INTO b (k, V) VALUES ($1, $2)")
|
||||
.await?
|
||||
};
|
||||
trx.execute(&s, &[&key, &bytes]).await?;
|
||||
} else if value_exists {
|
||||
let s = trx.prepare_cached("DELETE FROM b WHERE k = $1").await?;
|
||||
trx.execute(&s, &[&key]).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Apply the operations
|
||||
account_id = u32::MAX;
|
||||
collection = u8::MAX;
|
||||
document_id = u32::MAX;
|
||||
for op in &batch.ops {
|
||||
match op {
|
||||
Operation::AccountId {
|
||||
|
|
@ -168,20 +308,32 @@ impl PostgresStore {
|
|||
return Ok(false);
|
||||
}
|
||||
|
||||
if matches!(class, ValueClass::ReservedId) {
|
||||
/*if matches!(class, ValueClass::ReservedId) {
|
||||
// Make sure the reserved id is not already in use
|
||||
let s = trx.prepare_cached("SELECT 1 FROM b WHERE k = $1").await?;
|
||||
let s = trx.prepare_cached("SELECT v FROM b WHERE k = $1").await?;
|
||||
let key = BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class: BitmapClass::DocumentIds,
|
||||
block_num: document_id,
|
||||
}
|
||||
.serialize(0);
|
||||
if trx.query_opt(&s, &[&key]).await?.is_some() {
|
||||
return Ok(false);
|
||||
.serialize(WITHOUT_BLOCK_NUM);
|
||||
|
||||
match trx
|
||||
.query_opt(&s, &[&key])
|
||||
.await?
|
||||
.map(|r| deserialize_bitmap(r.get(0)))
|
||||
{
|
||||
Some(Ok(bm)) if bm.contains(document_id) => {
|
||||
return Ok(false);
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
tracing::error!("Failed to deserialize bitmap: {:?}", e);
|
||||
return Ok(false);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}*/
|
||||
} else {
|
||||
let s = trx
|
||||
.prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table))
|
||||
|
|
@ -209,29 +361,7 @@ impl PostgresStore {
|
|||
};
|
||||
trx.execute(&s, &[&key]).await?;
|
||||
}
|
||||
Operation::Bitmap { class, set } => {
|
||||
let key = BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class,
|
||||
block_num: document_id,
|
||||
}
|
||||
.serialize(0);
|
||||
|
||||
let s = if *set {
|
||||
if matches!(class, BitmapClass::DocumentIds) {
|
||||
trx.prepare_cached("INSERT INTO b (k) VALUES ($1)").await?
|
||||
} else {
|
||||
trx.prepare_cached(
|
||||
"INSERT INTO b (k) VALUES ($1) ON CONFLICT (k) DO NOTHING",
|
||||
)
|
||||
.await?
|
||||
}
|
||||
} else {
|
||||
trx.prepare_cached("DELETE FROM b WHERE k = $1").await?
|
||||
};
|
||||
trx.execute(&s, &[&key]).await?;
|
||||
}
|
||||
Operation::Log {
|
||||
collection,
|
||||
change_id,
|
||||
|
|
@ -252,35 +382,7 @@ impl PostgresStore {
|
|||
.await?;
|
||||
trx.execute(&s, &[&key, set]).await?;
|
||||
}
|
||||
Operation::AssertValue {
|
||||
class,
|
||||
assert_value,
|
||||
} => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
};
|
||||
let table = char::from(key.subspace());
|
||||
let key = key.serialize(0);
|
||||
|
||||
let s = trx
|
||||
.prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table))
|
||||
.await?;
|
||||
let (exists, matches) = trx
|
||||
.query_opt(&s, &[&key])
|
||||
.await?
|
||||
.map(|row| {
|
||||
row.try_get::<_, &[u8]>(0)
|
||||
.map_or((true, false), |v| (true, assert_value.matches(v)))
|
||||
})
|
||||
.unwrap_or_else(|| (false, assert_value.is_none()));
|
||||
if !matches {
|
||||
return Ok(false);
|
||||
}
|
||||
asserted_values.insert(key, exists);
|
||||
}
|
||||
Operation::Bitmap { .. } | Operation::AssertValue { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
386
crates/store/src/backend/postgres/write_dense.rs
Normal file
386
crates/store/src/backend/postgres/write_dense.rs
Normal file
|
|
@ -0,0 +1,386 @@
|
|||
/*
|
||||
* Copyright (c) 2023 Stalwart Labs Ltd.
|
||||
*
|
||||
* This file is part of the Stalwart Mail Server.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of
|
||||
* the License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
* in the LICENSE file at the top-level directory of this distribution.
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* You can be released from the requirements of the AGPLv3 license by
|
||||
* purchasing a commercial license. Please contact licensing@stalw.art
|
||||
* for more details.
|
||||
*/
|
||||
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use ahash::AHashMap;
|
||||
use deadpool_postgres::Object;
|
||||
use rand::Rng;
|
||||
use tokio_postgres::{error::SqlState, IsolationLevel};
|
||||
|
||||
use crate::{
|
||||
write::{
|
||||
bitmap::{block_contains, DenseBitmap},
|
||||
Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME,
|
||||
},
|
||||
BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS,
|
||||
};
|
||||
|
||||
use super::PostgresStore;
|
||||
|
||||
impl PostgresStore {
|
||||
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
|
||||
let mut conn = self.conn_pool.get().await?;
|
||||
let start = Instant::now();
|
||||
let mut retry_count = 0;
|
||||
|
||||
loop {
|
||||
match self.write_trx(&mut conn, &batch).await {
|
||||
Ok(success) => {
|
||||
return if success {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(crate::Error::AssertValueFailed)
|
||||
};
|
||||
}
|
||||
Err(err) => match err.code() {
|
||||
Some(
|
||||
&SqlState::T_R_SERIALIZATION_FAILURE
|
||||
| &SqlState::T_R_DEADLOCK_DETECTED
|
||||
| &SqlState::UNIQUE_VIOLATION,
|
||||
) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => {
|
||||
let backoff = rand::thread_rng().gen_range(50..=300);
|
||||
tokio::time::sleep(Duration::from_millis(backoff)).await;
|
||||
retry_count += 1;
|
||||
}
|
||||
Some(&SqlState::UNIQUE_VIOLATION) => {
|
||||
return Err(crate::Error::AssertValueFailed);
|
||||
}
|
||||
_ => return Err(err.into()),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_trx(
|
||||
&self,
|
||||
conn: &mut Object,
|
||||
batch: &Batch,
|
||||
) -> Result<bool, tokio_postgres::Error> {
|
||||
let mut account_id = u32::MAX;
|
||||
let mut collection = u8::MAX;
|
||||
let mut document_id = u32::MAX;
|
||||
let mut asserted_values = AHashMap::new();
|
||||
let trx = conn
|
||||
.build_transaction()
|
||||
.isolation_level(IsolationLevel::ReadCommitted)
|
||||
.start()
|
||||
.await?;
|
||||
|
||||
// Sort the operations by key to avoid deadlocks
|
||||
let mut assert_values = BTreeMap::new();
|
||||
let mut bitmap_updates = BTreeMap::new();
|
||||
for op in &batch.ops {
|
||||
match op {
|
||||
Operation::AccountId {
|
||||
account_id: account_id_,
|
||||
} => {
|
||||
account_id = *account_id_;
|
||||
}
|
||||
Operation::Collection {
|
||||
collection: collection_,
|
||||
} => {
|
||||
collection = *collection_;
|
||||
}
|
||||
Operation::DocumentId {
|
||||
document_id: document_id_,
|
||||
} => {
|
||||
document_id = *document_id_;
|
||||
}
|
||||
Operation::AssertValue {
|
||||
class,
|
||||
assert_value,
|
||||
} => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
};
|
||||
let table = char::from(key.subspace());
|
||||
assert_values.insert(key.serialize(0), (table, assert_value));
|
||||
}
|
||||
Operation::Bitmap { class, set } => {
|
||||
bitmap_updates
|
||||
.entry(
|
||||
BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class,
|
||||
block_num: DenseBitmap::block_num(document_id),
|
||||
}
|
||||
.serialize(0),
|
||||
)
|
||||
.or_insert_with(Vec::new)
|
||||
.push((*set, document_id));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Assert values
|
||||
for (key, (table, assert_value)) in assert_values {
|
||||
let s = trx
|
||||
.prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table))
|
||||
.await?;
|
||||
let (exists, matches) = trx
|
||||
.query_opt(&s, &[&key])
|
||||
.await?
|
||||
.map(|row| {
|
||||
row.try_get::<_, &[u8]>(0)
|
||||
.map_or((true, false), |v| (true, assert_value.matches(v)))
|
||||
})
|
||||
.unwrap_or_else(|| (false, assert_value.is_none()));
|
||||
if !matches {
|
||||
return Ok(false);
|
||||
}
|
||||
asserted_values.insert(key, exists);
|
||||
}
|
||||
|
||||
// Update bitmaps
|
||||
for (key, changes) in bitmap_updates {
|
||||
// Try updating the bitmap first
|
||||
let mut update_query = String::from("v");
|
||||
let mut has_inserts = false;
|
||||
for (set, document_id) in &changes {
|
||||
update_query = format!(
|
||||
"set_bit({update_query},{},{})",
|
||||
DenseBitmap::block_index(*document_id),
|
||||
*set as i8
|
||||
);
|
||||
has_inserts = has_inserts || *set;
|
||||
}
|
||||
|
||||
let s = trx
|
||||
.prepare(&format!("UPDATE b SET v = {update_query} WHERE k = $1"))
|
||||
.await?;
|
||||
if trx.execute(&s, &[&key]).await? == 0 && has_inserts {
|
||||
// The bitmap does not exist, create it
|
||||
let mut dense_bm = DenseBitmap::empty();
|
||||
for (set, document_id) in changes {
|
||||
if set {
|
||||
dense_bm.set(document_id);
|
||||
}
|
||||
}
|
||||
let s = trx
|
||||
.prepare(&format!(
|
||||
"INSERT INTO b (k, v) VALUES ($1, $2) ON CONFLICT(k) DO UPDATE SET v = {}",
|
||||
update_query.replace("(v,", "(b.v,")
|
||||
))
|
||||
.await?;
|
||||
trx.execute(&s, &[&key, &&dense_bm.bitmap[..]]).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// Apply the operations
|
||||
account_id = u32::MAX;
|
||||
collection = u8::MAX;
|
||||
document_id = u32::MAX;
|
||||
for op in &batch.ops {
|
||||
match op {
|
||||
Operation::AccountId {
|
||||
account_id: account_id_,
|
||||
} => {
|
||||
account_id = *account_id_;
|
||||
}
|
||||
Operation::Collection {
|
||||
collection: collection_,
|
||||
} => {
|
||||
collection = *collection_;
|
||||
}
|
||||
Operation::DocumentId {
|
||||
document_id: document_id_,
|
||||
} => {
|
||||
document_id = *document_id_;
|
||||
}
|
||||
Operation::Value {
|
||||
class,
|
||||
op: ValueOp::Add(by),
|
||||
} => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
}
|
||||
.serialize(0);
|
||||
|
||||
if *by >= 0 {
|
||||
let s = trx
|
||||
.prepare_cached(concat!(
|
||||
"INSERT INTO c (k, v) VALUES ($1, $2) ",
|
||||
"ON CONFLICT(k) DO UPDATE SET v = c.v + EXCLUDED.v"
|
||||
))
|
||||
.await?;
|
||||
trx.execute(&s, &[&key, &by]).await?;
|
||||
} else {
|
||||
let s = trx
|
||||
.prepare_cached("UPDATE c SET v = v + $1 WHERE k = $2")
|
||||
.await?;
|
||||
trx.execute(&s, &[&by, &key]).await?;
|
||||
}
|
||||
}
|
||||
Operation::Value { class, op } => {
|
||||
let key = ValueKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
class,
|
||||
};
|
||||
let table = char::from(key.subspace());
|
||||
let key = key.serialize(0);
|
||||
|
||||
if let ValueOp::Set(value) = op {
|
||||
let s = if let Some(exists) = asserted_values.get(&key) {
|
||||
if *exists {
|
||||
trx.prepare_cached(&format!(
|
||||
"UPDATE {} SET v = $2 WHERE k = $1",
|
||||
table
|
||||
))
|
||||
.await?
|
||||
} else {
|
||||
trx.prepare_cached(&format!(
|
||||
"INSERT INTO {} (k, v) VALUES ($1, $2)",
|
||||
table
|
||||
))
|
||||
.await?
|
||||
}
|
||||
} else {
|
||||
trx.prepare_cached(&format!(
|
||||
concat!(
|
||||
"INSERT INTO {} (k, v) VALUES ($1, $2) ",
|
||||
"ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v"
|
||||
),
|
||||
table
|
||||
))
|
||||
.await?
|
||||
};
|
||||
|
||||
if trx.execute(&s, &[&key, value]).await? == 0 {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
if matches!(class, ValueClass::ReservedId) {
|
||||
// Make sure the reserved id is not already in use
|
||||
let block_num = DenseBitmap::block_num(document_id);
|
||||
let s = trx.prepare_cached("SELECT v FROM b WHERE k = $1").await?;
|
||||
let key = BitmapKey {
|
||||
account_id,
|
||||
collection,
|
||||
class: BitmapClass::DocumentIds,
|
||||
block_num,
|
||||
}
|
||||
.serialize(0);
|
||||
|
||||
if let Some(row) = trx.query_opt(&s, &[&key]).await? {
|
||||
if block_contains(row.get(0), block_num, document_id) {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let s = trx
|
||||
.prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table))
|
||||
.await?;
|
||||
trx.execute(&s, &[&key]).await?;
|
||||
}
|
||||
}
|
||||
Operation::Index { field, key, set } => {
|
||||
let key = IndexKey {
|
||||
account_id,
|
||||
collection,
|
||||
document_id,
|
||||
field: *field,
|
||||
key,
|
||||
}
|
||||
.serialize(0);
|
||||
|
||||
let s = if *set {
|
||||
trx.prepare_cached(
|
||||
"INSERT INTO i (k) VALUES ($1) ON CONFLICT (k) DO NOTHING",
|
||||
)
|
||||
.await?
|
||||
} else {
|
||||
trx.prepare_cached("DELETE FROM i WHERE k = $1").await?
|
||||
};
|
||||
trx.execute(&s, &[&key]).await?;
|
||||
}
|
||||
|
||||
Operation::Log {
|
||||
collection,
|
||||
change_id,
|
||||
set,
|
||||
} => {
|
||||
let key = LogKey {
|
||||
account_id,
|
||||
collection: *collection,
|
||||
change_id: *change_id,
|
||||
}
|
||||
.serialize(0);
|
||||
|
||||
let s = trx
|
||||
.prepare_cached(concat!(
|
||||
"INSERT INTO l (k, v) VALUES ($1, $2) ",
|
||||
"ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v"
|
||||
))
|
||||
.await?;
|
||||
trx.execute(&s, &[&key, set]).await?;
|
||||
}
|
||||
Operation::Bitmap { .. } | Operation::AssertValue { .. } => {}
|
||||
}
|
||||
}
|
||||
|
||||
trx.commit().await.map(|_| true)
|
||||
}
|
||||
|
||||
pub(crate) async fn purge_store(&self) -> crate::Result<()> {
|
||||
let todo = "delete bitmaps";
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
let s = conn
|
||||
.prepare_cached(&format!(
|
||||
"DELETE FROM {} WHERE v = 0",
|
||||
char::from(SUBSPACE_COUNTERS),
|
||||
))
|
||||
.await?;
|
||||
conn.execute(&s, &[]).await.map(|_| ()).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> {
|
||||
let conn = self.conn_pool.get().await?;
|
||||
|
||||
let s = conn
|
||||
.prepare_cached(&format!(
|
||||
"DELETE FROM {} WHERE k >= $1 AND k < $2",
|
||||
char::from(from.subspace()),
|
||||
))
|
||||
.await?;
|
||||
conn.execute(&s, &[&from.serialize(0), &to.serialize(0)])
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(Into::into)
|
||||
}
|
||||
}
|
||||
|
|
@ -58,8 +58,10 @@ pub fn deserialize_bitlist(bm: &mut RoaringBitmap, bytes: &[u8]) {
|
|||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn deserialize_bitmap(bytes: &[u8]) -> Option<RoaringBitmap> {
|
||||
RoaringBitmap::deserialize_unchecked_from(&bytes[1..]).ok()
|
||||
pub fn deserialize_bitmap(bytes: &[u8]) -> crate::Result<RoaringBitmap> {
|
||||
RoaringBitmap::deserialize_unchecked_from(bytes).map_err(|err| {
|
||||
crate::Error::InternalError(format!("Failed to deserialize bitmap: {}", err))
|
||||
})
|
||||
}
|
||||
|
||||
impl Deserialize for RoaringBitmap {
|
||||
|
|
@ -68,9 +70,7 @@ impl Deserialize for RoaringBitmap {
|
|||
.first()
|
||||
.ok_or_else(|| crate::Error::InternalError("Empty bitmap".to_string()))?
|
||||
{
|
||||
IS_BITMAP => deserialize_bitmap(bytes).ok_or_else(|| {
|
||||
crate::Error::InternalError("Failed to deserialize bitmap".to_string())
|
||||
}),
|
||||
IS_BITMAP => deserialize_bitmap(&bytes[1..]),
|
||||
IS_BITLIST => {
|
||||
let mut bm = RoaringBitmap::new();
|
||||
deserialize_bitlist(&mut bm, bytes);
|
||||
|
|
@ -220,7 +220,7 @@ pub fn bitmap_merge<'x>(
|
|||
for op in operands.into_iter() {
|
||||
match *op.first()? {
|
||||
IS_BITMAP => {
|
||||
if let Some(union_bm) = deserialize_bitmap(op) {
|
||||
if let Ok(union_bm) = deserialize_bitmap(&op[1..]) {
|
||||
if !bm.is_empty() {
|
||||
bm |= union_bm;
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -65,9 +65,16 @@ impl DenseBitmap {
|
|||
self.bitmap[(index / 8) as usize] &= !(1 << (index & 7));
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn block_num(index: u32) -> u32 {
|
||||
index / BITS_PER_BLOCK_L
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn block_index(index: u32) -> u32 {
|
||||
index & BITS_MASK_L
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub trait DeserializeBlock {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue