From fafb1f3ff0f67d875936cbb86f9f24cce174db80 Mon Sep 17 00:00:00 2001 From: mdecimus Date: Sun, 3 Mar 2024 11:34:10 +0100 Subject: [PATCH] Store RoaringBitmaps in PostgreSQL --- crates/store/src/backend/postgres/main.rs | 9 +- crates/store/src/backend/postgres/read.rs | 44 +- .../store/src/backend/postgres/read_dense.rs | 155 +++++++ crates/store/src/backend/postgres/write.rs | 228 ++++++++--- .../store/src/backend/postgres/write_dense.rs | 386 ++++++++++++++++++ crates/store/src/backend/rocksdb/bitmap.rs | 12 +- crates/store/src/write/bitmap.rs | 7 + 7 files changed, 747 insertions(+), 94 deletions(-) create mode 100644 crates/store/src/backend/postgres/read_dense.rs create mode 100644 crates/store/src/backend/postgres/write_dense.rs diff --git a/crates/store/src/backend/postgres/main.rs b/crates/store/src/backend/postgres/main.rs index 3757f4c4..fc7c10a6 100644 --- a/crates/store/src/backend/postgres/main.rs +++ b/crates/store/src/backend/postgres/main.rs @@ -74,7 +74,12 @@ impl PostgresStore { pub(super) async fn create_tables(&self) -> crate::Result<()> { let conn = self.conn_pool.get().await?; - for table in [SUBSPACE_VALUES, SUBSPACE_LOGS, SUBSPACE_BLOBS] { + for table in [ + SUBSPACE_VALUES, + SUBSPACE_LOGS, + SUBSPACE_BLOBS, + SUBSPACE_BITMAPS, + ] { let table = char::from(table); conn.execute( &format!( @@ -88,7 +93,7 @@ impl PostgresStore { .await?; } - for table in [SUBSPACE_INDEXES, SUBSPACE_BITMAPS] { + for table in [SUBSPACE_INDEXES] { let table = char::from(table); conn.execute( &format!( diff --git a/crates/store/src/backend/postgres/read.rs b/crates/store/src/backend/postgres/read.rs index 1b67db3a..8829d560 100644 --- a/crates/store/src/backend/postgres/read.rs +++ b/crates/store/src/backend/postgres/read.rs @@ -25,8 +25,9 @@ use futures::{pin_mut, TryStreamExt}; use roaring::RoaringBitmap; use crate::{ - write::{key::DeserializeBigEndian, BitmapClass, ValueClass}, - BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, + backend::rocksdb::bitmap::deserialize_bitmap, + write::{BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, WITHOUT_BLOCK_NUM, }; use super::PostgresStore; @@ -58,29 +59,26 @@ impl PostgresStore { pub(crate) async fn get_bitmap( &self, - mut key: BitmapKey, + key: BitmapKey, ) -> crate::Result> { - let begin = key.serialize(0); - key.block_num = u32::MAX; - let key_len = begin.len(); - let end = key.serialize(0); let conn = self.conn_pool.get().await?; - - let mut bm = RoaringBitmap::new(); - let s = conn - .prepare_cached("SELECT k FROM b WHERE k >= $1 AND k <= $2") - .await?; - let rows = conn.query_raw(&s, &[&begin, &end]).await?; - - pin_mut!(rows); - - while let Some(row) = rows.try_next().await? { - let key: &[u8] = row.try_get(0)?; - if key.len() == key_len { - bm.insert(key.deserialize_be_u32(key.len() - U32_LEN)?); - } - } - Ok(if !bm.is_empty() { Some(bm) } else { None }) + let s = conn.prepare_cached("SELECT v FROM b WHERE k = $1").await?; + let key = key.serialize(WITHOUT_BLOCK_NUM); + conn.query_opt(&s, &[&key]) + .await + .map_err(Into::into) + .and_then(|r| { + if let Some(r) = r { + let bm = deserialize_bitmap(r.get(0))?; + if !bm.is_empty() { + Ok(Some(bm)) + } else { + Ok(None) + } + } else { + Ok(None) + } + }) } pub(crate) async fn iterate( diff --git a/crates/store/src/backend/postgres/read_dense.rs b/crates/store/src/backend/postgres/read_dense.rs new file mode 100644 index 00000000..5d4da589 --- /dev/null +++ b/crates/store/src/backend/postgres/read_dense.rs @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of the Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use futures::{pin_mut, TryStreamExt}; +use roaring::RoaringBitmap; + +use crate::{ + write::{bitmap::DeserializeBlock, key::DeserializeBigEndian, BitmapClass, ValueClass}, + BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, +}; + +use super::PostgresStore; + +impl PostgresStore { + pub(crate) async fn get_value(&self, key: impl Key) -> crate::Result> + where + U: Deserialize + 'static, + { + let conn = self.conn_pool.get().await?; + let s = conn + .prepare_cached(&format!( + "SELECT v FROM {} WHERE k = $1", + char::from(key.subspace()) + )) + .await?; + let key = key.serialize(0); + conn.query_opt(&s, &[&key]) + .await + .map_err(Into::into) + .and_then(|r| { + if let Some(r) = r { + Ok(Some(U::deserialize(r.get(0))?)) + } else { + Ok(None) + } + }) + } + + pub(crate) async fn get_bitmap( + &self, + mut key: BitmapKey, + ) -> crate::Result> { + let begin = key.serialize(0); + key.block_num = u32::MAX; + let key_len = begin.len(); + let end = key.serialize(0); + let conn = self.conn_pool.get().await?; + + let mut bm = RoaringBitmap::new(); + let s = conn + .prepare_cached("SELECT k, v FROM b WHERE k >= $1 AND k <= $2") + .await?; + let rows = conn.query_raw(&s, &[&begin, &end]).await?; + + pin_mut!(rows); + + while let Some(row) = rows.try_next().await? { + let key: &[u8] = row.try_get(0)?; + if key.len() == key_len { + let value: &[u8] = row.try_get(0)?; + bm.deserialize_block(value, key.deserialize_be_u32(key.len() - U32_LEN)?); + } + } + Ok(if !bm.is_empty() { Some(bm) } else { None }) + } + + pub(crate) async fn iterate( + &self, + params: IterateParams, + mut cb: impl for<'x> FnMut(&'x [u8], &'x [u8]) -> crate::Result + Sync + Send, + ) -> crate::Result<()> { + let conn = self.conn_pool.get().await?; + let table = char::from(params.begin.subspace()); + let begin = params.begin.serialize(0); + let end = params.end.serialize(0); + let keys = if params.values { "k, v" } else { "k" }; + + let s = conn + .prepare_cached(&match (params.first, params.ascending) { + (true, true) => { + format!( + "SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k ASC LIMIT 1" + ) + } + (true, false) => { + format!( + "SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k DESC LIMIT 1" + ) + } + (false, true) => { + format!("SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k ASC") + } + (false, false) => { + format!("SELECT {keys} FROM {table} WHERE k >= $1 AND k <= $2 ORDER BY k DESC") + } + }) + .await?; + let rows = conn.query_raw(&s, &[&begin, &end]).await?; + + pin_mut!(rows); + + if params.values { + while let Some(row) = rows.try_next().await? { + let key = row.try_get::<_, &[u8]>(0)?; + let value = row.try_get::<_, &[u8]>(1)?; + + if !cb(key, value)? { + break; + } + } + } else { + while let Some(row) = rows.try_next().await? { + if !cb(row.try_get::<_, &[u8]>(0)?, b"")? { + break; + } + } + } + + Ok(()) + } + + pub(crate) async fn get_counter( + &self, + key: impl Into> + Sync + Send, + ) -> crate::Result { + let key = key.into().serialize(0); + let conn = self.conn_pool.get().await?; + let s = conn.prepare_cached("SELECT v FROM c WHERE k = $1").await?; + match conn.query_opt(&s, &[&key]).await { + Ok(Some(row)) => row.try_get(0).map_err(Into::into), + Ok(None) => Ok(0), + Err(e) => Err(e.into()), + } + } +} diff --git a/crates/store/src/backend/postgres/write.rs b/crates/store/src/backend/postgres/write.rs index fa86df72..ff1871cf 100644 --- a/crates/store/src/backend/postgres/write.rs +++ b/crates/store/src/backend/postgres/write.rs @@ -21,18 +21,21 @@ * for more details. */ -use std::time::{Duration, Instant}; +use std::{ + collections::{BTreeMap, BTreeSet}, + time::{Duration, Instant}, +}; use ahash::AHashMap; use deadpool_postgres::Object; use rand::Rng; +use roaring::RoaringBitmap; use tokio_postgres::{error::SqlState, IsolationLevel}; use crate::{ - write::{ - Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, - }, - BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS, + backend::rocksdb::bitmap::deserialize_bitmap, + write::{Batch, Operation, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME}, + BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS, WITHOUT_BLOCK_NUM, }; use super::PostgresStore; @@ -54,7 +57,9 @@ impl PostgresStore { } Err(err) => match err.code() { Some( - &SqlState::T_R_SERIALIZATION_FAILURE | &SqlState::T_R_DEADLOCK_DETECTED, + &SqlState::T_R_SERIALIZATION_FAILURE + | &SqlState::T_R_DEADLOCK_DETECTED + | &SqlState::UNIQUE_VIOLATION, ) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => { let backoff = rand::thread_rng().gen_range(50..=300); tokio::time::sleep(Duration::from_millis(backoff)).await; @@ -84,6 +89,141 @@ impl PostgresStore { .start() .await?; + // Sort the operations by key to avoid deadlocks + let mut assert_values = BTreeMap::new(); + let mut bitmap_updates = BTreeMap::new(); + let mut advisory_locks = BTreeSet::new(); + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + } => { + document_id = *document_id_; + } + Operation::AssertValue { + class, + assert_value, + } => { + let key = ValueKey { + account_id, + collection, + document_id, + class, + }; + let table = char::from(key.subspace()); + assert_values.insert(key.serialize(0), (table, assert_value)); + if account_id != u32::MAX { + advisory_locks.insert((account_id as u64) << 32 | collection as u64); + } + } + Operation::Bitmap { class, set } => { + bitmap_updates + .entry( + BitmapKey { + account_id, + collection, + class, + block_num: 0, + } + .serialize(WITHOUT_BLOCK_NUM), + ) + .or_insert_with(Vec::new) + .push((*set, document_id)); + if account_id != u32::MAX { + advisory_locks.insert((account_id as u64) << 32 | collection as u64); + } + } + _ => {} + } + } + + // Acquire advisory locks + for lock in advisory_locks { + trx.execute("SELECT pg_advisory_xact_lock($1)", &[&(lock as i64)]) + .await?; + } + + // Assert values + for (key, (table, assert_value)) in assert_values { + let s = trx + .prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table)) + .await?; + let (exists, matches) = trx + .query_opt(&s, &[&key]) + .await? + .map(|row| { + row.try_get::<_, &[u8]>(0) + .map_or((true, false), |v| (true, assert_value.matches(v))) + }) + .unwrap_or_else(|| (false, assert_value.is_none())); + if !matches { + return Ok(false); + } + asserted_values.insert(key, exists); + } + + // Update bitmaps + for (key, changes) in bitmap_updates { + let s = trx + .prepare_cached("SELECT v FROM b WHERE k = $1 FOR UPDATE") + .await?; + let (value_exists, mut bm) = match trx + .query_opt(&s, &[&key]) + .await? + .map(|r| deserialize_bitmap(r.get(0))) + { + Some(Ok(bm)) => (true, bm), + None => (false, RoaringBitmap::new()), + Some(Err(e)) => { + tracing::error!("Failed to deserialize bitmap: {:?}", e); + return Ok(false); + } + }; + + let mut has_changes = false; + for (set, document_id) in changes { + if set { + if bm.insert(document_id) { + has_changes = true; + } + } else if bm.remove(document_id) { + has_changes = true; + } + } + + if has_changes { + if !bm.is_empty() { + let mut bytes = Vec::with_capacity(bm.serialized_size() + 1); + let _ = bm.serialize_into(&mut bytes); + let s = if value_exists { + trx.prepare_cached("UPDATE b SET v = $2 WHERE k = $1") + .await? + } else { + trx.prepare_cached("INSERT INTO b (k, V) VALUES ($1, $2)") + .await? + }; + trx.execute(&s, &[&key, &bytes]).await?; + } else if value_exists { + let s = trx.prepare_cached("DELETE FROM b WHERE k = $1").await?; + trx.execute(&s, &[&key]).await?; + } + } + } + + // Apply the operations + account_id = u32::MAX; + collection = u8::MAX; + document_id = u32::MAX; for op in &batch.ops { match op { Operation::AccountId { @@ -168,20 +308,32 @@ impl PostgresStore { return Ok(false); } - if matches!(class, ValueClass::ReservedId) { + /*if matches!(class, ValueClass::ReservedId) { // Make sure the reserved id is not already in use - let s = trx.prepare_cached("SELECT 1 FROM b WHERE k = $1").await?; + let s = trx.prepare_cached("SELECT v FROM b WHERE k = $1").await?; let key = BitmapKey { account_id, collection, class: BitmapClass::DocumentIds, block_num: document_id, } - .serialize(0); - if trx.query_opt(&s, &[&key]).await?.is_some() { - return Ok(false); + .serialize(WITHOUT_BLOCK_NUM); + + match trx + .query_opt(&s, &[&key]) + .await? + .map(|r| deserialize_bitmap(r.get(0))) + { + Some(Ok(bm)) if bm.contains(document_id) => { + return Ok(false); + } + Some(Err(e)) => { + tracing::error!("Failed to deserialize bitmap: {:?}", e); + return Ok(false); + } + _ => {} } - } + }*/ } else { let s = trx .prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table)) @@ -209,29 +361,7 @@ impl PostgresStore { }; trx.execute(&s, &[&key]).await?; } - Operation::Bitmap { class, set } => { - let key = BitmapKey { - account_id, - collection, - class, - block_num: document_id, - } - .serialize(0); - let s = if *set { - if matches!(class, BitmapClass::DocumentIds) { - trx.prepare_cached("INSERT INTO b (k) VALUES ($1)").await? - } else { - trx.prepare_cached( - "INSERT INTO b (k) VALUES ($1) ON CONFLICT (k) DO NOTHING", - ) - .await? - } - } else { - trx.prepare_cached("DELETE FROM b WHERE k = $1").await? - }; - trx.execute(&s, &[&key]).await?; - } Operation::Log { collection, change_id, @@ -252,35 +382,7 @@ impl PostgresStore { .await?; trx.execute(&s, &[&key, set]).await?; } - Operation::AssertValue { - class, - assert_value, - } => { - let key = ValueKey { - account_id, - collection, - document_id, - class, - }; - let table = char::from(key.subspace()); - let key = key.serialize(0); - - let s = trx - .prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table)) - .await?; - let (exists, matches) = trx - .query_opt(&s, &[&key]) - .await? - .map(|row| { - row.try_get::<_, &[u8]>(0) - .map_or((true, false), |v| (true, assert_value.matches(v))) - }) - .unwrap_or_else(|| (false, assert_value.is_none())); - if !matches { - return Ok(false); - } - asserted_values.insert(key, exists); - } + Operation::Bitmap { .. } | Operation::AssertValue { .. } => {} } } diff --git a/crates/store/src/backend/postgres/write_dense.rs b/crates/store/src/backend/postgres/write_dense.rs new file mode 100644 index 00000000..91969f7b --- /dev/null +++ b/crates/store/src/backend/postgres/write_dense.rs @@ -0,0 +1,386 @@ +/* + * Copyright (c) 2023 Stalwart Labs Ltd. + * + * This file is part of the Stalwart Mail Server. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * in the LICENSE file at the top-level directory of this distribution. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * You can be released from the requirements of the AGPLv3 license by + * purchasing a commercial license. Please contact licensing@stalw.art + * for more details. +*/ + +use std::{ + collections::BTreeMap, + time::{Duration, Instant}, +}; + +use ahash::AHashMap; +use deadpool_postgres::Object; +use rand::Rng; +use tokio_postgres::{error::SqlState, IsolationLevel}; + +use crate::{ + write::{ + bitmap::{block_contains, DenseBitmap}, + Batch, BitmapClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS, MAX_COMMIT_TIME, + }, + BitmapKey, IndexKey, Key, LogKey, ValueKey, SUBSPACE_COUNTERS, +}; + +use super::PostgresStore; + +impl PostgresStore { + pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> { + let mut conn = self.conn_pool.get().await?; + let start = Instant::now(); + let mut retry_count = 0; + + loop { + match self.write_trx(&mut conn, &batch).await { + Ok(success) => { + return if success { + Ok(()) + } else { + Err(crate::Error::AssertValueFailed) + }; + } + Err(err) => match err.code() { + Some( + &SqlState::T_R_SERIALIZATION_FAILURE + | &SqlState::T_R_DEADLOCK_DETECTED + | &SqlState::UNIQUE_VIOLATION, + ) if retry_count < MAX_COMMIT_ATTEMPTS && start.elapsed() < MAX_COMMIT_TIME => { + let backoff = rand::thread_rng().gen_range(50..=300); + tokio::time::sleep(Duration::from_millis(backoff)).await; + retry_count += 1; + } + Some(&SqlState::UNIQUE_VIOLATION) => { + return Err(crate::Error::AssertValueFailed); + } + _ => return Err(err.into()), + }, + } + } + } + + async fn write_trx( + &self, + conn: &mut Object, + batch: &Batch, + ) -> Result { + let mut account_id = u32::MAX; + let mut collection = u8::MAX; + let mut document_id = u32::MAX; + let mut asserted_values = AHashMap::new(); + let trx = conn + .build_transaction() + .isolation_level(IsolationLevel::ReadCommitted) + .start() + .await?; + + // Sort the operations by key to avoid deadlocks + let mut assert_values = BTreeMap::new(); + let mut bitmap_updates = BTreeMap::new(); + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + } => { + document_id = *document_id_; + } + Operation::AssertValue { + class, + assert_value, + } => { + let key = ValueKey { + account_id, + collection, + document_id, + class, + }; + let table = char::from(key.subspace()); + assert_values.insert(key.serialize(0), (table, assert_value)); + } + Operation::Bitmap { class, set } => { + bitmap_updates + .entry( + BitmapKey { + account_id, + collection, + class, + block_num: DenseBitmap::block_num(document_id), + } + .serialize(0), + ) + .or_insert_with(Vec::new) + .push((*set, document_id)); + } + _ => {} + } + } + + // Assert values + for (key, (table, assert_value)) in assert_values { + let s = trx + .prepare_cached(&format!("SELECT v FROM {} WHERE k = $1 FOR UPDATE", table)) + .await?; + let (exists, matches) = trx + .query_opt(&s, &[&key]) + .await? + .map(|row| { + row.try_get::<_, &[u8]>(0) + .map_or((true, false), |v| (true, assert_value.matches(v))) + }) + .unwrap_or_else(|| (false, assert_value.is_none())); + if !matches { + return Ok(false); + } + asserted_values.insert(key, exists); + } + + // Update bitmaps + for (key, changes) in bitmap_updates { + // Try updating the bitmap first + let mut update_query = String::from("v"); + let mut has_inserts = false; + for (set, document_id) in &changes { + update_query = format!( + "set_bit({update_query},{},{})", + DenseBitmap::block_index(*document_id), + *set as i8 + ); + has_inserts = has_inserts || *set; + } + + let s = trx + .prepare(&format!("UPDATE b SET v = {update_query} WHERE k = $1")) + .await?; + if trx.execute(&s, &[&key]).await? == 0 && has_inserts { + // The bitmap does not exist, create it + let mut dense_bm = DenseBitmap::empty(); + for (set, document_id) in changes { + if set { + dense_bm.set(document_id); + } + } + let s = trx + .prepare(&format!( + "INSERT INTO b (k, v) VALUES ($1, $2) ON CONFLICT(k) DO UPDATE SET v = {}", + update_query.replace("(v,", "(b.v,") + )) + .await?; + trx.execute(&s, &[&key, &&dense_bm.bitmap[..]]).await?; + } + } + + // Apply the operations + account_id = u32::MAX; + collection = u8::MAX; + document_id = u32::MAX; + for op in &batch.ops { + match op { + Operation::AccountId { + account_id: account_id_, + } => { + account_id = *account_id_; + } + Operation::Collection { + collection: collection_, + } => { + collection = *collection_; + } + Operation::DocumentId { + document_id: document_id_, + } => { + document_id = *document_id_; + } + Operation::Value { + class, + op: ValueOp::Add(by), + } => { + let key = ValueKey { + account_id, + collection, + document_id, + class, + } + .serialize(0); + + if *by >= 0 { + let s = trx + .prepare_cached(concat!( + "INSERT INTO c (k, v) VALUES ($1, $2) ", + "ON CONFLICT(k) DO UPDATE SET v = c.v + EXCLUDED.v" + )) + .await?; + trx.execute(&s, &[&key, &by]).await?; + } else { + let s = trx + .prepare_cached("UPDATE c SET v = v + $1 WHERE k = $2") + .await?; + trx.execute(&s, &[&by, &key]).await?; + } + } + Operation::Value { class, op } => { + let key = ValueKey { + account_id, + collection, + document_id, + class, + }; + let table = char::from(key.subspace()); + let key = key.serialize(0); + + if let ValueOp::Set(value) = op { + let s = if let Some(exists) = asserted_values.get(&key) { + if *exists { + trx.prepare_cached(&format!( + "UPDATE {} SET v = $2 WHERE k = $1", + table + )) + .await? + } else { + trx.prepare_cached(&format!( + "INSERT INTO {} (k, v) VALUES ($1, $2)", + table + )) + .await? + } + } else { + trx.prepare_cached(&format!( + concat!( + "INSERT INTO {} (k, v) VALUES ($1, $2) ", + "ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v" + ), + table + )) + .await? + }; + + if trx.execute(&s, &[&key, value]).await? == 0 { + return Ok(false); + } + + if matches!(class, ValueClass::ReservedId) { + // Make sure the reserved id is not already in use + let block_num = DenseBitmap::block_num(document_id); + let s = trx.prepare_cached("SELECT v FROM b WHERE k = $1").await?; + let key = BitmapKey { + account_id, + collection, + class: BitmapClass::DocumentIds, + block_num, + } + .serialize(0); + + if let Some(row) = trx.query_opt(&s, &[&key]).await? { + if block_contains(row.get(0), block_num, document_id) { + return Ok(false); + } + } + } + } else { + let s = trx + .prepare_cached(&format!("DELETE FROM {} WHERE k = $1", table)) + .await?; + trx.execute(&s, &[&key]).await?; + } + } + Operation::Index { field, key, set } => { + let key = IndexKey { + account_id, + collection, + document_id, + field: *field, + key, + } + .serialize(0); + + let s = if *set { + trx.prepare_cached( + "INSERT INTO i (k) VALUES ($1) ON CONFLICT (k) DO NOTHING", + ) + .await? + } else { + trx.prepare_cached("DELETE FROM i WHERE k = $1").await? + }; + trx.execute(&s, &[&key]).await?; + } + + Operation::Log { + collection, + change_id, + set, + } => { + let key = LogKey { + account_id, + collection: *collection, + change_id: *change_id, + } + .serialize(0); + + let s = trx + .prepare_cached(concat!( + "INSERT INTO l (k, v) VALUES ($1, $2) ", + "ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v" + )) + .await?; + trx.execute(&s, &[&key, set]).await?; + } + Operation::Bitmap { .. } | Operation::AssertValue { .. } => {} + } + } + + trx.commit().await.map(|_| true) + } + + pub(crate) async fn purge_store(&self) -> crate::Result<()> { + let todo = "delete bitmaps"; + let conn = self.conn_pool.get().await?; + + let s = conn + .prepare_cached(&format!( + "DELETE FROM {} WHERE v = 0", + char::from(SUBSPACE_COUNTERS), + )) + .await?; + conn.execute(&s, &[]).await.map(|_| ()).map_err(Into::into) + } + + pub(crate) async fn delete_range(&self, from: impl Key, to: impl Key) -> crate::Result<()> { + let conn = self.conn_pool.get().await?; + + let s = conn + .prepare_cached(&format!( + "DELETE FROM {} WHERE k >= $1 AND k < $2", + char::from(from.subspace()), + )) + .await?; + conn.execute(&s, &[&from.serialize(0), &to.serialize(0)]) + .await + .map(|_| ()) + .map_err(Into::into) + } +} diff --git a/crates/store/src/backend/rocksdb/bitmap.rs b/crates/store/src/backend/rocksdb/bitmap.rs index 8cf956cf..eb3d1407 100644 --- a/crates/store/src/backend/rocksdb/bitmap.rs +++ b/crates/store/src/backend/rocksdb/bitmap.rs @@ -58,8 +58,10 @@ pub fn deserialize_bitlist(bm: &mut RoaringBitmap, bytes: &[u8]) { } #[inline(always)] -pub fn deserialize_bitmap(bytes: &[u8]) -> Option { - RoaringBitmap::deserialize_unchecked_from(&bytes[1..]).ok() +pub fn deserialize_bitmap(bytes: &[u8]) -> crate::Result { + RoaringBitmap::deserialize_unchecked_from(bytes).map_err(|err| { + crate::Error::InternalError(format!("Failed to deserialize bitmap: {}", err)) + }) } impl Deserialize for RoaringBitmap { @@ -68,9 +70,7 @@ impl Deserialize for RoaringBitmap { .first() .ok_or_else(|| crate::Error::InternalError("Empty bitmap".to_string()))? { - IS_BITMAP => deserialize_bitmap(bytes).ok_or_else(|| { - crate::Error::InternalError("Failed to deserialize bitmap".to_string()) - }), + IS_BITMAP => deserialize_bitmap(&bytes[1..]), IS_BITLIST => { let mut bm = RoaringBitmap::new(); deserialize_bitlist(&mut bm, bytes); @@ -220,7 +220,7 @@ pub fn bitmap_merge<'x>( for op in operands.into_iter() { match *op.first()? { IS_BITMAP => { - if let Some(union_bm) = deserialize_bitmap(op) { + if let Ok(union_bm) = deserialize_bitmap(&op[1..]) { if !bm.is_empty() { bm |= union_bm; } else { diff --git a/crates/store/src/write/bitmap.rs b/crates/store/src/write/bitmap.rs index 8759c7d2..1c7190d7 100644 --- a/crates/store/src/write/bitmap.rs +++ b/crates/store/src/write/bitmap.rs @@ -65,9 +65,16 @@ impl DenseBitmap { self.bitmap[(index / 8) as usize] &= !(1 << (index & 7)); } + #[inline(always)] pub fn block_num(index: u32) -> u32 { index / BITS_PER_BLOCK_L } + + #[inline(always)] + pub fn block_index(index: u32) -> u32 { + index & BITS_MASK_L + } + } pub trait DeserializeBlock {