Faster IMAP UID generation (mySQL and RocksDB impl)

This commit is contained in:
mdecimus 2024-03-06 18:54:05 +01:00
parent 7041d495fc
commit 5b30d49327
10 changed files with 194 additions and 154 deletions

View file

@ -31,9 +31,9 @@ tracing = "0.1"
jemallocator = "0.5.0"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["foundationdb", "postgres"]
#default = ["foundationdb", "postgres", "mysql"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]

View file

@ -43,9 +43,3 @@ impl From<FdbError> for Error {
Self::InternalError(format!("FoundationDB error: {}", error.message()))
}
}
fn deserialize_i64_le(bytes: &[u8]) -> crate::Result<i64> {
Ok(i64::from_le_bytes(bytes[..].try_into().map_err(|_| {
crate::Error::InternalError("Invalid counter value.".to_string())
})?))
}

View file

@ -30,6 +30,7 @@ use futures::StreamExt;
use roaring::RoaringBitmap;
use crate::{
backend::deserialize_i64_le,
write::{
bitmap::DeserializeBlock,
key::{DeserializeBigEndian, KeySerializer},
@ -38,7 +39,7 @@ use crate::{
BitmapKey, Deserialize, IterateParams, Key, ValueKey, U32_LEN, WITH_SUBSPACE,
};
use super::{deserialize_i64_le, FdbStore, MAX_VALUE_SIZE};
use super::{FdbStore, MAX_VALUE_SIZE};
#[cfg(feature = "fdb-chunked-bm")]
pub(crate) enum ChunkedBitmap {

View file

@ -35,6 +35,7 @@ use futures::StreamExt;
use rand::Rng;
use crate::{
backend::deserialize_i64_le,
write::{
bitmap::{block_contains, DenseBitmap},
key::KeySerializer,
@ -45,7 +46,6 @@ use crate::{
};
use super::{
deserialize_i64_le,
read::{read_chunked_value, ChunkedValue},
FdbStore, MAX_VALUE_SIZE,
};

View file

@ -54,3 +54,9 @@ impl From<std::io::Error> for crate::Error {
Self::InternalError(format!("IO error: {}", err))
}
}
fn deserialize_i64_le(bytes: &[u8]) -> crate::Result<i64> {
Ok(i64::from_le_bytes(bytes[..].try_into().map_err(|_| {
crate::Error::InternalError("Failed to deserialize i64 value.".to_string())
})?))
}

View file

@ -37,19 +37,15 @@ use crate::{
use super::MysqlStore;
impl MysqlStore {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<Option<i64>> {
let start = Instant::now();
let mut retry_count = 0;
let mut conn = self.conn_pool.get_conn().await?;
loop {
match self.write_trx(&mut conn, &batch).await {
Ok(success) => {
return if success {
Ok(())
} else {
Err(crate::Error::AssertValueFailed)
};
Ok(result) => {
return result;
}
Err(Error::Server(err))
if [1062, 1213].contains(&err.code)
@ -67,7 +63,11 @@ impl MysqlStore {
}
}
async fn write_trx(&self, conn: &mut Conn, batch: &Batch) -> Result<bool, mysql_async::Error> {
async fn write_trx(
&self,
conn: &mut Conn,
batch: &Batch,
) -> Result<crate::Result<Option<i64>>, mysql_async::Error> {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
@ -77,6 +77,7 @@ impl MysqlStore {
.with_consistent_snapshot(false)
.with_isolation_level(IsolationLevel::ReadCommitted);
let mut trx = conn.start_transaction(tx_opts).await?;
let mut result = None;
for op in &batch.ops {
match op {
@ -95,31 +96,6 @@ impl MysqlStore {
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::AtomicAdd(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(0);
if *by >= 0 {
let s = trx
.prep(concat!(
"INSERT INTO c (k, v) VALUES (?, ?) ",
"ON DUPLICATE KEY UPDATE v = v + VALUES(v)"
))
.await?;
trx.exec_drop(&s, (key, by)).await?;
} else {
let s = trx.prep("UPDATE c SET v = v + ? WHERE k = ?").await?;
trx.exec_drop(&s, (by, key)).await?;
}
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
@ -130,57 +106,99 @@ impl MysqlStore {
let table = char::from(key.subspace());
let key = key.serialize(0);
if let ValueOp::Set(value) = op {
let exists = asserted_values.get(&key);
let s = if let Some(exists) = exists {
if *exists {
trx.prep(&format!("UPDATE {} SET v = :v WHERE k = :k", table))
match op {
ValueOp::Set(value) => {
let exists = asserted_values.get(&key);
let s = if let Some(exists) = exists {
if *exists {
trx.prep(&format!("UPDATE {} SET v = :v WHERE k = :k", table))
.await?
} else {
trx.prep(&format!(
"INSERT INTO {} (k, v) VALUES (:k, :v)",
table
))
.await?
}
} else {
trx.prep(&format!("INSERT INTO {} (k, v) VALUES (:k, :v)", table))
.await?
}
} else {
trx
trx
.prep(
&format!("INSERT INTO {} (k, v) VALUES (:k, :v) ON DUPLICATE KEY UPDATE v = VALUES(v)", table),
)
.await?
};
};
match trx.exec_drop(&s, params! {"k" => key, "v" => value}).await {
Ok(_) => {
if exists.is_some() && trx.affected_rows() == 0 {
match trx.exec_drop(&s, params! {"k" => key, "v" => value}).await {
Ok(_) => {
if exists.is_some() && trx.affected_rows() == 0 {
trx.rollback().await?;
return Ok(Err(crate::Error::AssertValueFailed));
}
}
Err(err) => {
trx.rollback().await?;
return Ok(false);
return Err(err);
}
}
Err(err) => {
trx.rollback().await?;
return Err(err);
}
}
if matches!(class, ValueClass::ReservedId) {
// Make sure the reserved id is not already in use
let s = trx.prep("SELECT 1 FROM b WHERE k = ?").await?;
let key = BitmapKey {
account_id,
collection,
class: BitmapClass::DocumentIds,
block_num: document_id,
}
.serialize(0);
if trx.exec_first::<Row, _, _>(&s, (key,)).await?.is_some() {
trx.rollback().await?;
return Ok(false);
if matches!(class, ValueClass::ReservedId) {
// Make sure the reserved id is not already in use
let s = trx.prep("SELECT 1 FROM b WHERE k = ?").await?;
let key = BitmapKey {
account_id,
collection,
class: BitmapClass::DocumentIds,
block_num: document_id,
}
.serialize(0);
if trx.exec_first::<Row, _, _>(&s, (key,)).await?.is_some() {
trx.rollback().await?;
return Ok(Err(crate::Error::AssertValueFailed));
}
}
}
} else {
let s = trx
.prep(&format!("DELETE FROM {} WHERE k = ?", table))
.await?;
trx.exec_drop(&s, (key,)).await?;
ValueOp::AtomicAdd(by) => {
if *by >= 0 {
let s = trx
.prep(concat!(
"INSERT INTO c (k, v) VALUES (?, ?) ",
"ON DUPLICATE KEY UPDATE v = v + VALUES(v)"
))
.await?;
trx.exec_drop(&s, (key, by)).await?;
} else {
let s = trx.prep("UPDATE c SET v = v + ? WHERE k = ?").await?;
trx.exec_drop(&s, (by, key)).await?;
}
}
ValueOp::AddAndGet(by) => {
let s = trx
.prep(concat!(
"INSERT INTO c (k, v) VALUES (:k, LAST_INSERT_ID(:v)) ",
"ON DUPLICATE KEY UPDATE v = LAST_INSERT_ID(v + :v)"
))
.await?;
trx.exec_drop(&s, params! {"k" => key, "v" => by}).await?;
let s = trx.prep("SELECT LAST_INSERT_ID()").await?;
result = trx
.exec_first::<i64, _, _>(&s, ())
.await?
.ok_or_else(|| {
mysql_async::Error::Io(mysql_async::IoError::Io(
std::io::Error::new(
std::io::ErrorKind::Other,
"LAST_INSERT_ID() did not return a value",
),
))
})
.map(Some)?;
}
ValueOp::Clear => {
let s = trx
.prep(&format!("DELETE FROM {} WHERE k = ?", table))
.await?;
trx.exec_drop(&s, (key,)).await?;
}
}
}
Operation::Index { field, key, set } => {
@ -260,14 +278,14 @@ impl MysqlStore {
.unwrap_or_else(|| (false, assert_value.is_none()));
if !matches {
trx.rollback().await?;
return Ok(false);
return Ok(Err(crate::Error::AssertValueFailed));
}
asserted_values.insert(key, exists);
}
}
}
trx.commit().await.map(|_| true)
trx.commit().await.map(|_| Ok(result))
}
pub(crate) async fn purge_store(&self) -> crate::Result<()> {

View file

@ -39,6 +39,7 @@ use super::{
RocksDbStore, CF_BITMAPS, CF_COUNTERS, CF_INDEXES, CF_LOGS, CF_VALUES,
};
use crate::{
backend::deserialize_i64_le,
write::{
Batch, BitmapClass, LookupClass, Operation, ValueClass, ValueOp, MAX_COMMIT_ATTEMPTS,
MAX_COMMIT_TIME,
@ -47,7 +48,7 @@ use crate::{
};
impl RocksDbStore {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<()> {
pub(crate) async fn write(&self, batch: Batch) -> crate::Result<Option<i64>> {
let db = self.db.clone();
self.spawn_worker(move || {
@ -68,8 +69,8 @@ impl RocksDbStore {
let start = Instant::now();
loop {
match txn.commit() {
Ok(_) => {
return Ok(());
Ok(result) => {
return Ok(result);
}
Err(CommitError::Internal(err)) => return Err(err),
Err(CommitError::RocksDB(err)) => match err.kind() {
@ -175,10 +176,11 @@ enum CommitError {
}
impl<'x> RocksDBTransaction<'x> {
fn commit(&self) -> Result<(), CommitError> {
fn commit(&self) -> Result<Option<i64>, CommitError> {
let mut account_id = u32::MAX;
let mut collection = u8::MAX;
let mut document_id = u32::MAX;
let mut result = None;
let txn = self
.db
@ -202,73 +204,82 @@ impl<'x> RocksDBTransaction<'x> {
} => {
document_id = *document_id_;
}
Operation::Value {
class,
op: ValueOp::AtomicAdd(by),
} => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(0);
txn.merge_cf(&self.cf_counters, &key, &by.to_le_bytes()[..])?;
}
Operation::Value { class, op } => {
let key = ValueKey {
account_id,
collection,
document_id,
class,
}
.serialize(0);
};
let is_counter = key.is_counter();
let key = key.serialize(0);
if let ValueOp::Set(value) = op {
txn.put_cf(&self.cf_values, &key, value)?;
match op {
ValueOp::Set(value) => {
txn.put_cf(&self.cf_values, &key, value)?;
if matches!(class, ValueClass::ReservedId) {
if let Some(bitmap) = txn
.get_pinned_for_update_cf(
&self.cf_bitmaps,
&BitmapKey {
account_id,
collection,
class: BitmapClass::DocumentIds,
block_num: 0,
if matches!(class, ValueClass::ReservedId) {
if let Some(bitmap) = txn
.get_pinned_for_update_cf(
&self.cf_bitmaps,
&BitmapKey {
account_id,
collection,
class: BitmapClass::DocumentIds,
block_num: 0,
}
.serialize(WITHOUT_BLOCK_NUM),
true,
)
.map_err(CommitError::from)
.and_then(|bytes| {
if let Some(bytes) = bytes {
RoaringBitmap::deserialize(&bytes)
.map(Some)
.map_err(CommitError::from)
} else {
Ok(None)
}
})?
{
if bitmap.contains(document_id) {
txn.rollback()?;
return Err(CommitError::Internal(
crate::Error::AssertValueFailed,
));
}
.serialize(WITHOUT_BLOCK_NUM),
true,
)
.map_err(CommitError::from)
.and_then(|bytes| {
if let Some(bytes) = bytes {
RoaringBitmap::deserialize(&bytes)
.map(Some)
.map_err(CommitError::from)
} else {
Ok(None)
}
})?
{
if bitmap.contains(document_id) {
txn.rollback()?;
return Err(CommitError::Internal(
crate::Error::AssertValueFailed,
));
}
}
}
} else {
txn.delete_cf(
if matches!(class, ValueClass::Lookup(LookupClass::Counter(_))) {
&self.cf_counters
} else {
&self.cf_values
},
&key,
)?;
ValueOp::AtomicAdd(by) => {
txn.merge_cf(&self.cf_counters, &key, &by.to_le_bytes()[..])?;
}
ValueOp::AddAndGet(by) => {
let num = txn
.get_pinned_for_update_cf(&self.cf_counters, &key, true)
.map_err(CommitError::from)
.and_then(|bytes| {
if let Some(bytes) = bytes {
deserialize_i64_le(&bytes)
.map(|v| v + *by)
.map_err(CommitError::from)
} else {
Ok(*by)
}
})?;
txn.put_cf(&self.cf_counters, &key, &num.to_le_bytes()[..])?;
result = Some(num);
}
ValueOp::Clear => {
txn.delete_cf(
if is_counter {
&self.cf_counters
} else {
&self.cf_values
},
&key,
)?;
}
}
}
Operation::Index { field, key, set } => {
@ -342,7 +353,7 @@ impl<'x> RocksDBTransaction<'x> {
}
}
txn.commit().map_err(Into::into)
txn.commit().map(|_| result).map_err(Into::into)
} else {
let mut wb = txn.get_writebatch();
for op in &self.batch.ops {
@ -449,7 +460,7 @@ impl<'x> RocksDBTransaction<'x> {
}
}
self.db.write(wb).map_err(Into::into)
self.db.write(wb).map(|_| result).map_err(Into::into)
}
}
}

View file

@ -220,6 +220,10 @@ impl Batch {
class: ValueClass::ReservedId,
op: ValueOp::Set(_)
}
| Operation::Value {
op: ValueOp::AddAndGet(_),
..
}
)
})
}

View file

@ -169,6 +169,16 @@ impl<T: AsRef<ValueClass>> ValueKey<T> {
..self
}
}
pub fn is_counter(&self) -> bool {
match self.class.as_ref() {
ValueClass::Directory(DirectoryClass::UsedQuota(_))
| ValueClass::Lookup(LookupClass::Counter(_))
| ValueClass::Queue(QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_)) => true,
ValueClass::Property(84) if self.collection == 1 => true, // TODO: Find a more elegant way to do this
_ => false,
}
}
}
impl Key for IndexKeyPrefix {
@ -220,14 +230,10 @@ impl Key for LogKey {
impl<T: AsRef<ValueClass> + Sync + Send> Key for ValueKey<T> {
fn subspace(&self) -> u8 {
match self.class.as_ref() {
ValueClass::Directory(DirectoryClass::UsedQuota(_))
| ValueClass::Lookup(LookupClass::Counter(_))
| ValueClass::Queue(QueueClass::QuotaCount(_) | QueueClass::QuotaSize(_)) => {
SUBSPACE_COUNTERS
}
ValueClass::Property(84) if self.collection == 1 => SUBSPACE_COUNTERS, // TODO: Find a more elegant way to do this
_ => SUBSPACE_VALUES,
if self.is_counter() {
SUBSPACE_COUNTERS
} else {
SUBSPACE_VALUES
}
}

View file

@ -5,9 +5,9 @@ edition = "2021"
resolver = "2"
[features]
#default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "foundationdb", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["foundationdb", "postgres"]
#default = ["foundationdb", "postgres"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]