Improved full-text indexing

This commit is contained in:
mdecimus 2024-05-12 18:15:41 +02:00
parent f4e5a0baf5
commit 398b31b40b
31 changed files with 1386 additions and 731 deletions

View file

@ -2,6 +2,35 @@
All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).
## [0.8.0] - 2024-05-13
This version uses a different database layout which is incompatible with previous versions. Please read the [UPGRADING.md](UPGRADING.md) file for more information on how to upgrade from previous versions.
## Added
- Clustering support with node auto-discovery and partition-tolerant failure detection.
- Autoconfig and MS Autodiscover support (#336)
- New variables `retry_num`, `notify_num`, `last_error` add `last_status` available in queue expressions.
- Performance improvements, in particular for FoundationDB.
- Improved full-text indexing with lower disk space usage.
- MTA-STS policy management.
- TLSA Records generation for DANE (#397)
- Queued message visualization from the web-admin.
- Master user support.
### Changed
- Make `certificate.*` local keys by default.
- Removed `server.run-as.*` settings.
- Add Microsoft Office Macro types to bad mime types (#391)
### Fixed
- mySQL TLS support (#415)
- Resolve file macros after dropping root privileges.
- Updated order of SPF Records (#395).
- Avoid duplicate accountIds when using case insensitive external directories (#399)
- `authenticated_as` variable not usable for must-match-sender (#372)
- Remove `StandardOutput`, `StandardError` in service (#390)
- SMTP `AUTH=LOGIN` compatibility issues with Microsoft Outlook (#400)
## [0.7.3] - 2024-05-01
To upgrade replace the `stalwart-mail` binary and then upgrade to the latest web-admin version.

10
Cargo.lock generated
View file

@ -567,6 +567,15 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1"
[[package]]
name = "bitpacking"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c1d3e2bfd8d06048a179f7b17afc3188effa10385e7b00dc65af6aae732ea92"
dependencies = [
"crunchy",
]
[[package]]
name = "bitvec"
version = "1.0.1"
@ -5800,6 +5809,7 @@ dependencies = [
"arc-swap",
"async-trait",
"bincode",
"bitpacking",
"blake3",
"bytes",
"deadpool",

View file

@ -51,7 +51,7 @@ Key features:
- **JMAP** server:
- JMAP Core ([RFC 8620](https://datatracker.ietf.org/doc/html/rfc8620))
- JMAP Mail ([RFC 8621](https://datatracker.ietf.org/doc/html/rfc8621))
- JMAP for Sieve Scripts ([DRAFT-SIEVE-19](https://www.ietf.org/archive/id/draft-ietf-jmap-sieve-19.html))
- JMAP for Sieve Scripts ([DRAFT-SIEVE-22](https://www.ietf.org/archive/id/draft-ietf-jmap-sieve-22.html))
- JMAP over WebSocket ([RFC 8887](https://datatracker.ietf.org/doc/html/rfc8887)), JMAP Blob Management ([RFC9404](https://www.rfc-editor.org/rfc/rfc9404.html)) and JMAP for Quotas ([RFC9425](https://www.rfc-editor.org/rfc/rfc9425.html)) extensions.
- **IMAP4** server:
- IMAP4rev2 ([RFC 9051](https://datatracker.ietf.org/doc/html/rfc9051)) full compliance.
@ -76,12 +76,14 @@ Key features:
- **Spam traps** to set up decoy email addresses that catch and analyze spam.
- **Flexible and scalable**:
- Pluggable storage backends with **RocksDB**, **FoundationDB**, **PostgreSQL**, **mySQL**, **SQLite**, **S3-Compatible**, **Redis** and **ElasticSearch** support.
- **Clustering** support with node autodiscovery and partition-tolerant failure detection.
- Built-in, **LDAP** or **SQL** authentication backend support.
- Full-text search available in 17 languages.
- Disk quotas.
- Sieve scripting language with support for all [registered extensions](https://www.iana.org/assignments/sieve-extensions/sieve-extensions.xhtml).
- Email aliases, mailing lists, subaddressing and catch-all addresses support.
- Automatic account configuration and discovery with [autoconfig](https://www.ietf.org/id/draft-bucksch-autoconfig-02.html) and [autodiscover](https://learn.microsoft.com/en-us/exchange/architecture/client-access/autodiscover?view=exchserver-2019).
- Integration with **OpenTelemetry** to enable monitoring, tracing, and performance analysis.
- Disk quotas.
- **Web-based administration**:
- Account, domain, group and mailing list management.
- SMTP queue management for messages and outbound DMARC and TLS reports.

View file

@ -1,3 +1,91 @@
Upgrading from `v0.7.3` to `v0.8.0`
-----------------------------------
Version `0.8.0` includes both performance and security enhancements that require your data to be migrated to a new database layout. Luckily version `0.7.3` includes a migration tool which should make this process much easier than previous upgrades. In addition to the new layout, you will have to change the systemd service file to use the `CAP_NET_BIND_SERVICE` capability.
## Preparation
- Upgrade to version `0.7.3` if you haven't already. If you are on a version previous to `0.7.0`, you will have to do a manual migration of your data using the Command-line Interface.
- Create a directory where your data will be exported to, for example `/opt/stalwart-mail/export`.
## Systemd service upgrade (Linux only)
- Stop the `v0.7.3` installation:
```bash
$ sudo systemctl stop stalwart-mail
```
- Update your systemd file to include the `CAP_NET_BIND_SERVICE` capability. Open the file `/etc/systemd/system/stalwart-mail.service` in a text editor and add the following lines under the `[Service]` section:
```
User=stalwart-mail
Group=stalwart-mail
AmbientCapabilities=CAP_NET_BIND_SERVICE
```
- Reload the daemon:
```bash
$ systemctl daemon-reload
```
- Do not start the service yet.
## Data migration
- Stop Stalwart and export your data:
```bash
$ sudo systemctl stop stalwart-mail
$ sudo /opt/stalwart-mail/bin/stalwart-mail --config /opt/stalwart-mail/etc/config.toml --export /opt/stalwart-mail/export
$ sudo chown -R stalwart-mail:stalwart-mail /opt/stalwart-mail/export
```
or, if you are using the Docker image:
```bash
$ docker stop stalwart-mail
$ docker run --rm -it stalwart-mail /opt/stalwart-mail/bin/stalwart-mail --config /opt/stalwart-mail/etc/config.toml --export /opt/stalwart-mail/export
```
- Backup your `v0.7.3` installation:
- If you are using RocksDB or SQLite, simply rename the `data` directory to `data-backup`, for example:
```bash
$ mv /opt/stalwart-mail/data /opt/stalwart-mail/data-backup
$ mkdir /opt/stalwart-mail/data
$ chown stalwart-mail:stalwart-mail /opt/stalwart-mail/data
```
- If you are using PostgreSQL, rename the database and create a blank database with the same name, for example:
```sql
ALTER DATABASE stalwart RENAME TO stalwart_old;
CREATE database stalwart;
```
- If you are using MySQL, rename the database and create a blank database with the same name, for example:
```sql
CREATE DATABASE stalwart_old;
RENAME TABLE stalwart.b TO stalwart_old.b;
RENAME TABLE stalwart.v TO stalwart_old.v;
RENAME TABLE stalwart.l TO stalwart_old.l;
RENAME TABLE stalwart.i TO stalwart_old.i;
RENAME TABLE stalwart.t TO stalwart_old.t;
RENAME TABLE stalwart.c TO stalwart_old.c;
DROP DATABASE stalwart;
CREATE database stalwart;
```
- If you are using FoundationDB, backup your database and clean the entire key range.
- Download the `v0.8.0` mail-server for your platform from the [releases page](https://github.com/stalwartlabs/mail-server/releases/latest/) and replace the binary in `/opt/stalwart-mail/bin`. If you are using the Docker image, pull the latest image.
- Import your data:
```bash
$ sudo -u stalwart-mail /opt/stalwart-mail/bin/stalwart-mail --config /opt/stalwart-mail/etc/config.toml --import /opt/stalwart-mail/export
```
or, if you are using the Docker image:
```bash
$ docker run --rm -it stalwart-mail /opt/stalwart-mail/bin/stalwart-mail --config /opt/stalwart-mail/etc/config.toml --export /opt/stalwart-mail/export
```
- Start the service:
```bash
$ sudo systemctl start stalwart-mail
```
Or, if you are using the Docker image:
```bash
$ docker start stalwart-mail
```
Upgrading from `v0.6.0` to `v0.7.0`
-----------------------------------

View file

@ -48,7 +48,7 @@ use utils::{
use crate::Core;
pub(super) const MAGIC_MARKER: u8 = 123;
pub(super) const FILE_VERSION: u8 = 1;
pub(super) const FILE_VERSION: u8 = 2;
#[derive(Debug)]
pub(super) enum Op {
@ -62,7 +62,7 @@ pub(super) enum Op {
#[derive(Debug, Clone, Copy)]
pub(super) enum Family {
Property = 0,
TermIndex = 1,
FtsIndex = 1,
Acl = 2,
Blob = 3,
Config = 4,
@ -229,10 +229,11 @@ impl Core {
(
tokio::spawn(async move {
writer
.send(Op::Family(Family::TermIndex))
.send(Op::Family(Family::FtsIndex))
.failed("Failed to send family");
let mut keys = BTreeSet::new();
let mut last_account_id = u32::MAX;
let mut last_collection = u8::MAX;
store
.iterate(
@ -241,68 +242,56 @@ impl Core {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::TermIndex,
class: ValueClass::FtsIndex(BitmapHash {
hash: [0; 8],
len: 1,
}),
},
ValueKey {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::TermIndex,
class: ValueClass::FtsIndex(BitmapHash {
hash: [u8::MAX; 8],
len: u8::MAX,
}),
},
)
.no_values(),
|key, _| {
),
|key, value| {
let account_id = key.deserialize_be_u32(0)?;
let collection = key.deserialize_u8(U32_LEN)?;
let document_id =
key.range(U32_LEN + 1..usize::MAX)?.deserialize_leb128()?;
let collection = key.deserialize_u8(key.len() - U32_LEN - 1)?;
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
keys.insert((account_id, collection, document_id));
if account_id != last_account_id {
writer
.send(Op::AccountId(account_id))
.failed("Failed to send account id");
last_account_id = account_id;
}
if collection != last_collection {
writer
.send(Op::Collection(collection))
.failed("Failed to send collection");
last_collection = collection;
}
writer
.send(Op::DocumentId(document_id))
.failed("Failed to send document id");
writer
.send(Op::KeyValue((
key.range(U32_LEN..key.len() - U32_LEN - 1)?.to_vec(),
value.to_vec(),
)))
.failed("Failed to send key value");
Ok(true)
},
)
.await
.failed("Failed to iterate over data store");
let mut last_account_id = u32::MAX;
let mut last_collection = u8::MAX;
for (account_id, collection, document_id) in keys {
if account_id != last_account_id {
writer
.send(Op::AccountId(account_id))
.failed("Failed to send account id");
last_account_id = account_id;
}
if collection != last_collection {
writer
.send(Op::Collection(collection))
.failed("Failed to send collection");
last_collection = collection;
}
writer
.send(Op::DocumentId(document_id))
.failed("Failed to send document id");
let value = store
.get_value::<RawBytes>(ValueKey {
account_id,
collection,
document_id,
class: ValueClass::TermIndex,
})
.await
.failed("Failed to get value")
.failed("Expected value")
.0;
writer
.send(Op::KeyValue((value.to_vec(), vec![])))
.failed("Failed to send key value");
}
}),
handle,
)

View file

@ -32,9 +32,10 @@ use store::{
roaring::RoaringBitmap,
write::{
key::DeserializeBigEndian, BatchBuilder, BitmapClass, BitmapHash, BlobOp, DirectoryClass,
LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue, ValueClass,
FtsQueueClass, LookupClass, MaybeDynamicId, MaybeDynamicValue, Operation, TagValue,
ValueClass,
},
BlobStore, Store, U32_LEN,
BlobStore, Serialize, Store, U32_LEN,
};
use store::{
write::{QueueClass, QueueEvent},
@ -83,6 +84,8 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
let mut document_id = u32::MAX;
let mut collection = u8::MAX;
let mut family = Family::None;
let email_collection = u8::from(Collection::Email);
let mut seq = 0;
let mut batch_size = 0;
let mut batch = BatchBuilder::new();
@ -123,8 +126,25 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
batch.set(ValueClass::Property(field), value);
}
}
Family::TermIndex => {
batch.set(ValueClass::TermIndex, key);
Family::FtsIndex => {
if reader.version > 1 {
let mut hash = [0u8; 8];
let (hash, len) = match key.len() {
9 => {
hash[..8].copy_from_slice(&key[..8]);
(hash, key[key.len() - 1])
}
len @ (1..=7) => {
hash[..len].copy_from_slice(&key[..len]);
(hash, len as u8)
}
invalid => {
panic!("Invalid text bitmap key length {invalid}");
}
};
batch.set(ValueClass::FtsIndex(BitmapHash { hash, len }), value);
}
}
Family::Acl => {
batch.set(
@ -140,6 +160,16 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
let hash = BlobHash::try_from_hash_slice(&key).expect("Invalid blob hash");
if account_id != u32::MAX && document_id != u32::MAX {
if reader.version == 1 && collection == email_collection {
batch.set(
ValueClass::FtsQueue(FtsQueueClass::Insert {
seq,
hash: hash.clone(),
}),
0u64.serialize(),
);
seq += 1;
}
batch.set(ValueClass::Blob(BlobOp::Link { hash }), vec![]);
} else {
batch_size -= value.len();
@ -261,48 +291,55 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
set: true,
}),
Family::Bitmap => {
let key = key.as_slice();
let class: BitmapClass<MaybeDynamicId> =
match key.first().expect("Failed to read bitmap class") {
0 => BitmapClass::DocumentIds,
1 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Id(MaybeDynamicId::Static(
key.deserialize_be_u32(2).expect("Failed to read tag id"),
)),
},
2 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Text(
key.get(2..).expect("Failed to read tag text").to_vec(),
),
},
3 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Id(MaybeDynamicId::Static(
key.get(2)
.copied()
.expect("Failed to read tag static id")
.into(),
)),
},
4 => {
if reader.version == 1 && collection == email_collection {
continue;
}
BitmapClass::Text {
field: key.get(1).copied().expect("Failed to read field"),
token: BitmapHash {
len: key
.get(2)
.copied()
.expect("Failed to read tag static id"),
hash: key
.get(3..11)
.expect("Failed to read tag static id")
.try_into()
.unwrap(),
},
}
}
_ => failed("Invalid bitmap class"),
};
let document_ids = RoaringBitmap::deserialize_from(&value[..])
.expect("Failed to deserialize bitmap");
let key = key.as_slice();
let class: BitmapClass<MaybeDynamicId> = match key
.first()
.expect("Failed to read bitmap class")
{
0 => BitmapClass::DocumentIds,
1 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Id(MaybeDynamicId::Static(
key.deserialize_be_u32(2).expect("Failed to read tag id"),
)),
},
2 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Text(
key.get(2..).expect("Failed to read tag text").to_vec(),
),
},
3 => BitmapClass::Tag {
field: key.get(1).copied().expect("Failed to read field"),
value: TagValue::Id(MaybeDynamicId::Static(
key.get(2)
.copied()
.expect("Failed to read tag static id")
.into(),
)),
},
4 => BitmapClass::Text {
field: key.get(1).copied().expect("Failed to read field"),
token: BitmapHash {
len: key.get(2).copied().expect("Failed to read tag static id"),
hash: key
.get(3..11)
.expect("Failed to read tag static id")
.try_into()
.unwrap(),
},
},
_ => failed("Invalid bitmap class"),
};
for document_id in document_ids {
batch.ops.push(Operation::DocumentId { document_id });
@ -362,6 +399,7 @@ async fn restore_file(store: Store, blob_store: BlobStore, path: &Path) {
}
struct OpReader {
version: u8,
file: BufReader<File>,
}
@ -378,16 +416,16 @@ impl OpReader {
failed(&format!("Invalid magic marker in {path:?}"));
}
if file
let version = file
.read_u8()
.await
.failed(&format!("Failed to read version from {path:?}"))
!= FILE_VERSION
{
.failed(&format!("Failed to read version from {path:?}"));
if version > FILE_VERSION {
failed(&format!("Invalid file version in {path:?}"));
}
Self { file }
Self { file, version }
}
async fn next(&mut self) -> Option<Op> {
@ -439,7 +477,7 @@ impl TryFrom<u8> for Family {
fn try_from(value: u8) -> Result<Self, Self::Error> {
match value {
0 => Ok(Self::Property),
1 => Ok(Self::TermIndex),
1 => Ok(Self::FtsIndex),
2 => Ok(Self::Acl),
3 => Ok(Self::Blob),
4 => Ok(Self::Config),

View file

@ -50,7 +50,7 @@ use mail_parser::{parsers::fields::thread::thread_name, HeaderName, HeaderValue}
use store::{
write::{
log::{Changes, LogInsert},
BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId, TagValue, ValueClass, F_BITMAP,
BatchBuilder, Bincode, FtsQueueClass, MaybeDynamicId, TagValue, ValueClass, F_BITMAP,
F_VALUE,
},
BlobClass, Serialize,
@ -417,7 +417,7 @@ impl JMAP {
.value(Property::Keywords, keywords, F_VALUE | F_BITMAP)
.value(Property::Cid, change_id, F_VALUE)
.set(
ValueClass::IndexEmail(IndexEmailClass::Insert {
ValueClass::FtsQueue(FtsQueueClass::Insert {
seq: self.generate_snowflake_id()?,
hash: metadata.blob_hash.clone(),
}),

View file

@ -40,7 +40,7 @@ use store::{
query::Filter,
write::{
log::{ChangeLogBuilder, Changes, LogInsert},
now, AssignedIds, BatchBuilder, BitmapClass, IndexEmailClass, MaybeDynamicId,
now, AssignedIds, BatchBuilder, BitmapClass, FtsQueueClass, MaybeDynamicId,
MaybeDynamicValue, SerializeWithId, TagValue, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
BitmapKey, BlobClass, Serialize,
@ -340,7 +340,7 @@ impl JMAP {
.set(Property::ThreadId, maybe_thread_id)
.tag(Property::ThreadId, TagValue::Id(maybe_thread_id), 0)
.set(
ValueClass::IndexEmail(IndexEmailClass::Insert {
ValueClass::FtsQueue(FtsQueueClass::Insert {
seq: self
.generate_snowflake_id()
.map_err(|_| IngestError::Temporary)?,

View file

@ -54,7 +54,7 @@ use store::{
ahash::AHashSet,
write::{
assert::HashedValue, log::ChangeLogBuilder, BatchBuilder, Bincode, DeserializeFrom,
IndexEmailClass, SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
FtsQueueClass, SerializeInto, ToBitmaps, ValueClass, F_BITMAP, F_CLEAR, F_VALUE,
},
Serialize,
};
@ -1091,7 +1091,7 @@ impl JMAP {
.with_collection(Collection::Email)
.delete_document(document_id)
.set(
ValueClass::IndexEmail(IndexEmailClass::Delete {
ValueClass::FtsQueue(FtsQueueClass::Delete {
seq: self.generate_snowflake_id()?,
}),
0u64.serialize(),

View file

@ -67,7 +67,7 @@ impl GossiperBuilder {
.property_or_default::<u16>("cluster.bind-port", "1179")
.unwrap_or(1179),
ping_interval: config
.property_or_default("cluster.ping-interval", "1s")
.property_or_default("cluster.heartbeat", "1s")
.unwrap_or(Duration::from_secs(1)),
peers: Vec::new(),
};

View file

@ -23,14 +23,15 @@
use jmap_proto::types::{collection::Collection, property::Property};
use store::{
ahash::AHashSet,
ahash::{AHashMap, AHashSet},
fts::index::FtsDocument,
write::{
key::DeserializeBigEndian, now, BatchBuilder, Bincode, IndexEmailClass, MaybeDynamicId,
key::DeserializeBigEndian, now, BatchBuilder, Bincode, FtsQueueClass, MaybeDynamicId,
ValueClass,
},
Deserialize, IterateParams, Serialize, ValueKey, U32_LEN, U64_LEN,
};
use utils::{BlobHash, BLOB_HASH_LEN};
use crate::{
@ -57,17 +58,18 @@ impl JMAP {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: 0 }),
class: ValueClass::FtsQueue(FtsQueueClass::Delete { seq: 0 }),
};
let to_key = ValueKey::<ValueClass<u32>> {
account_id: u32::MAX,
collection: u8::MAX,
document_id: u32::MAX,
class: ValueClass::IndexEmail(IndexEmailClass::Delete { seq: u64::MAX }),
class: ValueClass::FtsQueue(FtsQueueClass::Delete { seq: u64::MAX }),
};
// Retrieve entries pending to be indexed
let mut entries = Vec::new();
let mut insert_entries = Vec::new();
let mut delete_entries: AHashMap<u32, Vec<IndexEmail>> = AHashMap::new();
let mut skipped_documents = AHashSet::new();
let now = now();
let _ = self
@ -81,7 +83,14 @@ impl JMAP {
if event.lock_expiry < now {
if !skipped_documents.contains(&(event.account_id, event.document_id)) {
entries.push(event);
if event.insert_hash.is_some() {
insert_entries.push(event);
} else {
delete_entries
.entry(event.account_id)
.or_default()
.push(event);
}
} else {
tracing::trace!(
context = "queue",
@ -116,125 +125,94 @@ impl JMAP {
);
});
// Index entries
for event in entries {
// Add entries to the index
for event in insert_entries {
// Lock index
if !self.try_lock_index(&event).await {
continue;
}
if let Some(hash) = &event.insert_hash {
match self
.get_property::<Bincode<MessageMetadata>>(
event.account_id,
Collection::Email,
event.document_id,
Property::BodyStructure,
)
.await
match self
.get_property::<Bincode<MessageMetadata>>(
event.account_id,
Collection::Email,
event.document_id,
Property::BodyStructure,
)
.await
{
Ok(Some(metadata))
if metadata.inner.blob_hash.as_slice()
== event.insert_hash.as_ref().unwrap().as_slice() =>
{
Ok(Some(metadata))
if metadata.inner.blob_hash.as_slice() == hash.as_slice() =>
// Obtain raw message
let raw_message = if let Ok(Some(raw_message)) = self
.get_blob(&metadata.inner.blob_hash, 0..usize::MAX)
.await
{
// Obtain raw message
let raw_message = if let Ok(Some(raw_message)) = self
.get_blob(&metadata.inner.blob_hash, 0..usize::MAX)
.await
{
raw_message
} else {
tracing::warn!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
blob_hash = ?metadata.inner.blob_hash,
"Message blob not found"
);
continue;
};
let message = metadata.inner.contents.into_message(&raw_message);
// Index message
let document =
FtsDocument::with_default_language(self.core.jmap.default_language)
.with_account_id(event.account_id)
.with_collection(Collection::Email)
.with_document_id(event.document_id)
.index_message(&message);
if let Err(err) = self.core.storage.fts.index(document).await {
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to index email in FTS index"
);
continue;
}
tracing::debug!(
raw_message
} else {
tracing::warn!(
context = "fts_index_queued",
event = "index",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
"Indexed document in FTS index"
blob_hash = ?metadata.inner.blob_hash,
"Message blob not found"
);
}
continue;
};
let message = metadata.inner.contents.into_message(&raw_message);
Err(err) => {
// Index message
let document =
FtsDocument::with_default_language(self.core.jmap.default_language)
.with_account_id(event.account_id)
.with_collection(Collection::Email)
.with_document_id(event.document_id)
.index_message(&message);
if let Err(err) = self.core.storage.fts.index(document).await {
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to retrieve email metadata"
);
break;
}
_ => {
// The message was probably deleted or overwritten
tracing::debug!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
"Email metadata not found"
"Failed to index email in FTS index"
);
continue;
}
tracing::debug!(
context = "fts_index_queued",
event = "index",
account_id = event.account_id,
document_id = event.document_id,
"Indexed document in FTS index"
);
}
} else {
if let Err(err) = self
.core
.storage
.fts
.remove(
event.account_id,
Collection::Email.into(),
event.document_id,
)
.await
{
Err(err) => {
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
reason = ?err,
"Failed to remove document from FTS index"
"Failed to retrieve email metadata"
);
break;
}
_ => {
// The message was probably deleted or overwritten
tracing::debug!(
context = "fts_index_queued",
event = "error",
account_id = event.account_id,
document_id = event.document_id,
"Email metadata not found"
);
continue;
}
tracing::debug!(
context = "fts_index_queued",
event = "delete",
account_id = event.account_id,
document_id = event.document_id,
"Deleted document from FTS index"
);
}
// Remove entry from queue
@ -262,6 +240,71 @@ impl JMAP {
}
}
// Remove entries from the index
for (account_id, events) in delete_entries {
let mut document_ids = Vec::with_capacity(events.len());
let mut locked_events = Vec::with_capacity(events.len());
for event in events {
if self.try_lock_index(&event).await {
document_ids.push(event.document_id);
locked_events.push(event);
}
}
if document_ids.is_empty() {
continue;
}
if let Err(err) = self
.core
.storage
.fts
.remove(account_id, Collection::Email.into(), &document_ids)
.await
{
tracing::error!(
context = "fts_index_queued",
event = "error",
account_id = account_id,
document_ids = ?document_ids,
reason = ?err,
"Failed to remove documents from FTS index"
);
continue;
}
// Remove entries from the queue
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(Collection::Email);
for event in locked_events {
batch
.update_document(event.document_id)
.clear(event.value_class());
}
// Remove entry from queue
if let Err(err) = self.core.storage.data.write(batch.build()).await {
tracing::error!(
context = "fts_index_queued",
event = "error",
reason = ?err,
"Failed to remove index email from queue"
);
continue;
}
tracing::debug!(
context = "fts_index_queued",
event = "delete",
account_id = account_id,
document_ids = ?document_ids,
"Deleted document from FTS index"
);
}
if let Err(err) = self.inner.housekeeper_tx.send(Event::IndexDone).await {
tracing::warn!("Failed to send index done event to housekeeper: {}", err);
}
@ -271,6 +314,7 @@ impl JMAP {
let mut batch = BatchBuilder::new();
batch
.with_account_id(event.account_id)
.with_collection(Collection::Email)
.update_document(event.document_id)
.assert_value(event.value_class(), event.lock_expiry)
.set(event.value_class(), (now() + INDEX_LOCK_EXPIRY).serialize());
@ -282,7 +326,7 @@ impl JMAP {
event = "locked",
account_id = event.account_id,
document_id = event.document_id,
"Failed to lock index: Index already locked."
"Lock busy: Index already locked."
);
false
}
@ -302,11 +346,11 @@ impl JMAP {
impl IndexEmail {
fn value_class(&self) -> ValueClass<MaybeDynamicId> {
match &self.insert_hash {
Some(hash) => ValueClass::IndexEmail(IndexEmailClass::Insert {
Some(hash) => ValueClass::FtsQueue(FtsQueueClass::Insert {
hash: hash.clone(),
seq: self.seq,
}),
None => ValueClass::IndexEmail(IndexEmailClass::Delete { seq: self.seq }),
None => ValueClass::FtsQueue(FtsQueueClass::Delete { seq: self.seq }),
}
}
@ -314,10 +358,13 @@ impl IndexEmail {
Ok(IndexEmail {
seq: key.deserialize_be_u64(0)?,
account_id: key.deserialize_be_u32(U64_LEN)?,
document_id: key.deserialize_be_u32(U64_LEN + U32_LEN)?,
document_id: key.deserialize_be_u32(U64_LEN + U32_LEN + 1)?,
lock_expiry: u64::deserialize(value)?,
insert_hash: key
.get(U64_LEN + U32_LEN + U32_LEN..U64_LEN + U32_LEN + U32_LEN + BLOB_HASH_LEN)
.get(
U64_LEN + U32_LEN + U32_LEN + 1
..U64_LEN + U32_LEN + U32_LEN + BLOB_HASH_LEN + 1,
)
.and_then(|bytes| BlobHash::try_from_hash_slice(bytes).ok()),
})
}

View file

@ -32,8 +32,8 @@ tracing = "0.1"
jemallocator = "0.5.0"
[features]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "foundationdb"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "foundationdb"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]

View file

@ -45,6 +45,7 @@ redis = { version = "0.25.2", features = [ "tokio-comp", "tokio-rustls-comp", "t
deadpool = { version = "0.10.0", features = ["managed"], optional = true }
bincode = "1.3.3"
arc-swap = "1.6.0"
bitpacking = "0.9.2"
[dev-dependencies]
tokio = { version = "1.23", features = ["full"] }

View file

@ -77,8 +77,8 @@ impl ElasticSearchStore {
&self,
account_id: u32,
collection: u8,
document_id: u32,
) -> crate::Result<bool> {
document_ids: &[u32],
) -> crate::Result<()> {
self.index
.delete_by_query(DeleteByQueryParts::Index(&[
INDEX_NAMES[collection as usize]
@ -88,7 +88,7 @@ impl ElasticSearchStore {
"bool": {
"must": [
{ "match": { "account_id": account_id } },
{ "match": { "document_id": document_id } }
{ "terms": { "document_id": document_ids } }
]
}
}
@ -98,7 +98,7 @@ impl ElasticSearchStore {
.map_err(Into::into)
.and_then(|response| {
if response.status_code().is_success() {
Ok(true)
Ok(())
} else {
Err(crate::Error::InternalError(format!(
"Failed to remove document: {:?}",

View file

@ -104,7 +104,7 @@ impl MysqlStore {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
@ -114,7 +114,7 @@ impl MysqlStore {
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS,
] {
let table = char::from(table);

View file

@ -92,7 +92,7 @@ impl PostgresStore {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
@ -102,7 +102,7 @@ impl PostgresStore {
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {

View file

@ -91,7 +91,7 @@ impl RocksDbStore {
SUBSPACE_INDEXES,
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
@ -101,7 +101,7 @@ impl RocksDbStore {
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {

View file

@ -105,7 +105,7 @@ impl SqliteStore {
for table in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOOKUP_VALUE,
@ -115,7 +115,7 @@ impl SqliteStore {
SUBSPACE_QUEUE_EVENT,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_FTS_INDEX,
SUBSPACE_LOGS,
SUBSPACE_BLOBS,
] {

View file

@ -61,13 +61,13 @@ impl FtsStore {
&self,
account_id: u32,
collection: u8,
document_id: u32,
) -> crate::Result<bool> {
document_ids: &[u32],
) -> crate::Result<()> {
match self {
FtsStore::Store(store) => store.fts_remove(account_id, collection, document_id).await,
FtsStore::Store(store) => store.fts_remove(account_id, collection, document_ids).await,
#[cfg(feature = "elastic")]
FtsStore::ElasticSearch(store) => {
store.fts_remove(account_id, collection, document_id).await
store.fts_remove(account_id, collection, document_ids).await
}
}
}

View file

@ -27,7 +27,8 @@ use roaring::RoaringBitmap;
use crate::{
write::{
key::KeySerializer, now, AnyKey, AssignedIds, Batch, BitmapClass, ReportClass, ValueClass,
key::KeySerializer, now, AnyKey, AssignedIds, Batch, BitmapClass, BitmapHash, ReportClass,
ValueClass,
},
BitmapKey, Deserialize, IterateParams, Key, Store, ValueKey, SUBSPACE_BITMAP_ID,
SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_INDEXES, SUBSPACE_LOGS, U32_LEN,
@ -326,7 +327,16 @@ impl Store {
for (from_class, to_class) in [
(ValueClass::Acl(account_id), ValueClass::Acl(account_id + 1)),
(ValueClass::Property(0), ValueClass::Property(0)),
(ValueClass::TermIndex, ValueClass::TermIndex),
(
ValueClass::FtsIndex(BitmapHash {
hash: [0u8; 8],
len: 0,
}),
ValueClass::FtsIndex(BitmapHash {
hash: [u8::MAX; 8],
len: u8::MAX,
}),
),
] {
self.delete_range(
ValueKey {
@ -410,7 +420,7 @@ impl Store {
SUBSPACE_BITMAP_TAG,
SUBSPACE_BITMAP_TEXT,
SUBSPACE_DIRECTORY,
SUBSPACE_FTS_INDEX,
SUBSPACE_FTS_QUEUE,
SUBSPACE_INDEXES,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
@ -425,7 +435,7 @@ impl Store {
SUBSPACE_QUOTA,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TERM_INDEX,
SUBSPACE_FTS_INDEX,
] {
self.delete_range(
AnyKey {
@ -593,7 +603,7 @@ impl Store {
for (subspace, with_values) in [
(SUBSPACE_ACL, true),
//(SUBSPACE_DIRECTORY, true),
(SUBSPACE_FTS_INDEX, true),
(SUBSPACE_FTS_QUEUE, true),
(SUBSPACE_LOOKUP_VALUE, true),
(SUBSPACE_PROPERTY, true),
(SUBSPACE_SETTINGS, true),
@ -601,7 +611,7 @@ impl Store {
(SUBSPACE_QUEUE_EVENT, true),
(SUBSPACE_REPORT_OUT, true),
(SUBSPACE_REPORT_IN, true),
(SUBSPACE_TERM_INDEX, true),
(SUBSPACE_FTS_INDEX, true),
(SUBSPACE_BLOB_RESERVE, true),
(SUBSPACE_BLOB_LINK, true),
(SUBSPACE_BLOBS, true),

View file

@ -21,9 +21,9 @@
* for more details.
*/
use std::{borrow::Cow, collections::BTreeSet, fmt::Display};
use std::{borrow::Cow, fmt::Display};
use ahash::{AHashMap, AHashSet};
use ahash::AHashMap;
use nlp::{
language::{
detect::{LanguageDetector, MIN_LANGUAGE_SCORE},
@ -32,18 +32,17 @@ use nlp::{
},
tokenizers::word::WordTokenizer,
};
use utils::codec::leb128::Leb128Reader;
use crate::{
backend::MAX_TOKEN_LENGTH,
write::{
hash::TokenType, key::KeySerializer, BatchBuilder, BitmapClass, BitmapHash, Operation,
ValueClass,
hash::TokenType, key::DeserializeBigEndian, BatchBuilder, BitmapHash, MaybeDynamicId,
Operation, ValueClass, ValueOp,
},
Deserialize, Error, Store, ValueKey, U64_LEN,
IterateParams, Serialize, Store, ValueKey, U32_LEN,
};
use super::Field;
use super::{postings::Postings, Field};
pub const TERM_INDEX_VERSION: u8 = 1;
#[derive(Debug)]
@ -137,8 +136,9 @@ impl Store {
document: FtsDocument<'_, T>,
) -> crate::Result<()> {
let mut detect = LanguageDetector::new();
let mut tokens: AHashMap<BitmapHash, AHashSet<u8>> = AHashMap::new();
let mut tokens: AHashMap<BitmapHash, Postings> = AHashMap::new();
let mut parts = Vec::new();
let mut position = 0;
for text in document.parts {
match text.typ {
@ -156,15 +156,17 @@ impl Store {
tokens
.entry(BitmapHash::new(token.word.as_ref()))
.or_default()
.insert(TokenType::word(field));
.insert(TokenType::word(field), position);
position += 1;
}
position += 10;
}
Type::Keyword => {
let field = u8::from(text.field);
tokens
.entry(BitmapHash::new(text.text.as_ref()))
.or_default()
.insert(TokenType::word(field));
.insert_keyword(TokenType::word(field));
}
}
}
@ -172,7 +174,6 @@ impl Store {
let default_language = detect
.most_frequent_language()
.unwrap_or(document.default_language);
let mut bigrams = BTreeSet::new();
for (field, language, text) in parts.into_iter() {
let language = if language != Language::Unknown {
@ -182,26 +183,23 @@ impl Store {
};
let field: u8 = field.into();
let mut last_token = Cow::Borrowed("");
for token in Stemmer::new(&text, language, MAX_TOKEN_LENGTH) {
if !last_token.is_empty() {
bigrams.insert(BitmapHash::new(&format!("{} {}", last_token, token.word)).hash);
}
tokens
.entry(BitmapHash::new(token.word.as_ref()))
.or_default()
.insert(TokenType::word(field));
.insert(TokenType::word(field), position);
if let Some(stemmed_word) = token.stemmed_word {
tokens
.entry(BitmapHash::new(stemmed_word.as_ref()))
.or_default()
.insert(TokenType::stemmed(field));
.insert_keyword(TokenType::stemmed(field));
}
last_token = token.word;
position += 1;
}
position += 10;
}
if tokens.is_empty() {
@ -209,40 +207,34 @@ impl Store {
}
// Serialize tokens
let mut serializer = KeySerializer::new(tokens.len() * U64_LEN * 2);
//let mut term_index = KeySerializer::new(tokens.len() * U64_LEN * 2);
let mut keys = Vec::with_capacity(tokens.len());
// Write bigrams
serializer = serializer.write_leb128(bigrams.len());
for bigram in bigrams {
serializer = serializer.write(bigram.as_slice());
}
// Write index keys
for (hash, fields) in tokens.into_iter() {
serializer = serializer
.write(hash.hash.as_slice())
.write(hash.len)
.write(fields.len() as u8);
for field in fields.into_iter() {
serializer = serializer.write(field);
keys.push(Operation::Bitmap {
class: BitmapClass::Text { field, token: hash },
set: true,
});
}
for (hash, postings) in tokens.into_iter() {
//term_index = term_index.write(hash.hash.as_slice()).write(hash.len);
keys.push(Operation::Value {
class: ValueClass::FtsIndex(hash),
op: ValueOp::Set(postings.serialize().into()),
});
}
// Write term index
let mut batch = BatchBuilder::new();
let mut term_index = lz4_flex::compress_prepend_size(&serializer.finalize());
/*let mut batch = BatchBuilder::new();
let mut term_index = lz4_flex::compress_prepend_size(&term_index.finalize());
term_index.insert(0, TERM_INDEX_VERSION);
batch
.with_account_id(document.account_id)
.with_collection(document.collection)
.update_document(document.document_id)
.set(ValueClass::TermIndex, term_index);
self.write(batch.build()).await?;
.set(
ValueClass::FtsIndex(BitmapHash {
hash: [u8::MAX; 8],
len: u8::MAX,
}),
term_index,
);
self.write(batch.build()).await?;*/
let mut batch = BatchBuilder::new();
batch
.with_account_id(document.account_id)
@ -272,65 +264,91 @@ impl Store {
&self,
account_id: u32,
collection: u8,
document_id: u32,
) -> crate::Result<bool> {
// Obtain term index
let term_index = if let Some(term_index) = self
.get_value::<TermIndex>(ValueKey {
account_id,
collection,
document_id,
class: ValueClass::TermIndex,
})
.await?
{
term_index
} else {
tracing::debug!(
context = "fts_remove",
event = "not_found",
account_id = account_id,
collection = collection,
document_id = document_id,
"Term index not found"
);
return Ok(false);
};
document_ids: &[u32],
) -> crate::Result<()> {
// Find keys to delete
let mut delete_keys: AHashMap<u32, Vec<ValueClass<MaybeDynamicId>>> = AHashMap::new();
self.iterate(
IterateParams::new(
ValueKey {
account_id,
collection,
document_id: 0,
class: ValueClass::FtsIndex(BitmapHash {
hash: [0; 8],
len: 1,
}),
},
ValueKey {
account_id: account_id + 1,
collection,
document_id: 0,
class: ValueClass::FtsIndex(BitmapHash {
hash: [0; 8],
len: 1,
}),
},
)
.no_values(),
|key, _| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if document_ids.contains(&document_id) {
let mut hash = [0u8; 8];
let (hash, len) = match key.len() - (U32_LEN * 2) - 1 {
9 => {
hash[..8].copy_from_slice(&key[U32_LEN..U32_LEN + 8]);
(hash, key[key.len() - U32_LEN - 2])
}
len @ (1..=7) => {
hash[..len].copy_from_slice(&key[U32_LEN..U32_LEN + len]);
(hash, len as u8)
}
invalid => {
return Err(format!("Invalid text bitmap key length {invalid}").into())
}
};
delete_keys
.entry(document_id)
.or_default()
.push(ValueClass::FtsIndex(BitmapHash { hash, len }));
}
Ok(true)
},
)
.await?;
// Remove keys
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(collection)
.update_document(document_id);
.with_collection(collection);
for key in term_index.ops.into_iter() {
if batch.ops.len() >= 1000 {
self.write(batch.build()).await?;
batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(collection)
.update_document(document_id);
for (document_id, keys) in delete_keys {
batch.update_document(document_id);
for key in keys {
if batch.ops.len() >= 1000 {
self.write(batch.build()).await?;
batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(collection)
.update_document(document_id);
}
batch.ops.push(Operation::Value {
class: key,
op: ValueOp::Clear,
});
}
batch.ops.push(key);
}
if !batch.is_empty() {
self.write(batch.build()).await?;
}
// Remove term index
let mut batch = BatchBuilder::new();
batch
.with_account_id(account_id)
.with_collection(collection)
.update_document(document_id)
.clear(ValueClass::TermIndex);
self.write(batch.build()).await?;
Ok(true)
Ok(())
}
pub async fn fts_remove_all(&self, _: u32) -> crate::Result<()> {
@ -341,6 +359,7 @@ impl Store {
}
}
/*
struct TermIndex {
ops: Vec<Operation>,
}
@ -403,3 +422,4 @@ impl Deserialize for TermIndex {
Ok(Self { ops })
}
}
*/

View file

@ -26,6 +26,7 @@ use std::fmt::Display;
use nlp::language::Language;
pub mod index;
pub mod postings;
pub mod query;
#[derive(Clone, Debug, PartialEq, Eq)]

View file

@ -0,0 +1,516 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use std::cmp::Ordering;
use ahash::AHashSet;
use bitpacking::{BitPacker, BitPacker1x, BitPacker4x, BitPacker8x};
use utils::codec::leb128::Leb128Reader;
use crate::{write::key::KeySerializer, Serialize};
#[derive(Default)]
pub(super) struct Postings {
fields: AHashSet<u8>,
postings: Vec<u32>,
}
#[derive(Default)]
pub(super) struct SerializedPostings<T: AsRef<[u8]>> {
bytes: T,
}
impl Postings {
pub fn insert(&mut self, field: u8, posting: u32) {
self.fields.insert(field);
self.postings.push(posting);
}
pub fn insert_keyword(&mut self, field: u8) {
self.fields.insert(field);
}
}
impl<T: AsRef<[u8]>> SerializedPostings<T> {
pub fn new(bytes: T) -> Self {
SerializedPostings { bytes }
}
pub fn has_field(&self, field: u8) -> bool {
for byte in self.bytes.as_ref() {
match byte {
0xFF => return false,
_ if *byte == field => return true,
_ => {}
}
}
false
}
pub fn positions(&self) -> Vec<u32> {
self.into_iter().collect()
}
pub fn matches_positions(&self, positions: &[u32], offset: u32) -> bool {
let mut next_pos = self.into_iter().peekable();
for expect_pos in positions.iter().map(|pos| *pos + offset) {
while let Some(pos) = next_pos.peek() {
match pos.cmp(&expect_pos) {
Ordering::Less => {
next_pos.next();
}
Ordering::Equal => {
return true;
}
Ordering::Greater => {
break;
}
}
}
}
false
}
}
impl<'x, T: AsRef<[u8]>> IntoIterator for &'x SerializedPostings<T> {
type Item = u32;
type IntoIter = PostingsIterator<'x>;
fn into_iter(self) -> Self::IntoIter {
let bytes = self.bytes.as_ref();
for (bytes_offset, byte) in bytes.iter().enumerate() {
if *byte == 0xFF {
if let Some((items_left, bytes_read)) = bytes
.get(bytes_offset + 1..)
.and_then(|bytes| bytes.read_leb128::<usize>())
{
return PostingsIterator {
bytes,
bytes_offset: bytes_offset + bytes_read + 1,
items_left,
..Default::default()
};
}
break;
}
}
PostingsIterator::default()
}
}
#[derive(Default)]
pub(super) struct PostingsIterator<'x> {
bytes: &'x [u8],
bytes_offset: usize,
chunk: Vec<u32>,
chunk_offset: usize,
pub items_left: usize,
}
impl Iterator for PostingsIterator<'_> {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.chunk.get(self.chunk_offset) {
self.chunk_offset += 1;
return Some(*item);
}
let block_len = match self.items_left {
0 => return None,
1..=31 => {
self.items_left -= 1;
let (item, bytes_read) = self.bytes.get(self.bytes_offset..)?.read_leb128()?;
self.bytes_offset += bytes_read;
return Some(item);
}
32..=127 => BitPacker1x::BLOCK_LEN,
128..=255 => BitPacker4x::BLOCK_LEN,
_ => BitPacker8x::BLOCK_LEN,
};
let bitpacker = TermIndexPacker::with_block_len(block_len);
let num_bits = *self.bytes.get(self.bytes_offset)?;
let bytes_read = ((num_bits as usize) * block_len / 8) + 1;
let initial_value = self.chunk.last().copied();
self.chunk = vec![0u32; block_len];
self.chunk_offset = 1;
bitpacker.decompress_strictly_sorted(
initial_value,
self.bytes
.get(self.bytes_offset + 1..self.bytes_offset + bytes_read)?,
&mut self.chunk[..],
num_bits,
);
self.bytes_offset += bytes_read;
self.items_left -= block_len;
self.chunk.first().copied()
}
}
impl Serialize for Postings {
fn serialize(self) -> Vec<u8> {
// Serialize fields
let mut serializer =
KeySerializer::new((self.fields.len() + 1) + (self.postings.len() * 2));
for field in self.fields {
serializer = serializer.write(field);
}
serializer = serializer.write(u8::MAX);
// Compress postings
if !self.postings.is_empty() {
let mut bitpacker = TermIndexPacker::new();
let mut compressed = vec![0u8; 4 * BitPacker8x::BLOCK_LEN];
let mut pos = 0;
let len = self.postings.len();
let mut initial_value = None;
serializer = serializer.write_leb128(len);
while pos < len {
let block_len = match len - pos {
0..=31 => {
for val in &self.postings[pos..] {
serializer = serializer.write_leb128(*val);
}
break;
}
32..=127 => BitPacker1x::BLOCK_LEN,
128..=255 => BitPacker4x::BLOCK_LEN,
_ => BitPacker8x::BLOCK_LEN,
};
let chunk = &self.postings[pos..pos + block_len];
bitpacker.block_len(block_len);
let num_bits: u8 = bitpacker.num_bits_strictly_sorted(initial_value, chunk);
let compressed_len = bitpacker.compress_strictly_sorted(
initial_value,
chunk,
&mut compressed[..],
num_bits,
);
serializer = serializer
.write(num_bits)
.write(&compressed[..compressed_len]);
initial_value = chunk[chunk.len() - 1].into();
pos += block_len;
}
}
serializer.finalize()
}
}
#[derive(Clone, Copy)]
pub(crate) struct TermIndexPacker {
bitpacker_1: BitPacker1x,
bitpacker_4: BitPacker4x,
bitpacker_8: BitPacker8x,
block_len: usize,
}
impl TermIndexPacker {
pub fn with_block_len(block_len: usize) -> Self {
TermIndexPacker {
bitpacker_1: BitPacker1x::new(),
bitpacker_4: BitPacker4x::new(),
bitpacker_8: BitPacker8x::new(),
block_len,
}
}
pub fn block_len(&mut self, num: usize) {
self.block_len = num;
}
}
impl BitPacker for TermIndexPacker {
const BLOCK_LEN: usize = 0;
fn new() -> Self {
TermIndexPacker {
bitpacker_1: BitPacker1x::new(),
bitpacker_4: BitPacker4x::new(),
bitpacker_8: BitPacker8x::new(),
block_len: 1,
}
}
fn compress(&self, decompressed: &[u32], compressed: &mut [u8], num_bits: u8) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => self
.bitpacker_8
.compress(decompressed, compressed, num_bits),
BitPacker4x::BLOCK_LEN => self
.bitpacker_4
.compress(decompressed, compressed, num_bits),
_ => self
.bitpacker_1
.compress(decompressed, compressed, num_bits),
}
}
fn compress_sorted(
&self,
initial: u32,
decompressed: &[u32],
compressed: &mut [u8],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.compress_sorted(initial, decompressed, compressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.compress_sorted(initial, decompressed, compressed, num_bits)
}
_ => self
.bitpacker_1
.compress_sorted(initial, decompressed, compressed, num_bits),
}
}
fn decompress(&self, compressed: &[u8], decompressed: &mut [u32], num_bits: u8) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.decompress(compressed, decompressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.decompress(compressed, decompressed, num_bits)
}
_ => self
.bitpacker_1
.decompress(compressed, decompressed, num_bits),
}
}
fn decompress_sorted(
&self,
initial: u32,
compressed: &[u8],
decompressed: &mut [u32],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => {
self.bitpacker_8
.decompress_sorted(initial, compressed, decompressed, num_bits)
}
BitPacker4x::BLOCK_LEN => {
self.bitpacker_4
.decompress_sorted(initial, compressed, decompressed, num_bits)
}
_ => self
.bitpacker_1
.decompress_sorted(initial, compressed, decompressed, num_bits),
}
}
fn num_bits(&self, decompressed: &[u32]) -> u8 {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits(decompressed),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits(decompressed),
_ => self.bitpacker_1.num_bits(decompressed),
}
}
fn num_bits_sorted(&self, initial: u32, decompressed: &[u32]) -> u8 {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.num_bits_sorted(initial, decompressed),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.num_bits_sorted(initial, decompressed),
_ => self.bitpacker_1.num_bits_sorted(initial, decompressed),
}
}
fn compress_strictly_sorted(
&self,
initial: Option<u32>,
decompressed: &[u32],
compressed: &mut [u8],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.compress_strictly_sorted(
initial,
decompressed,
compressed,
num_bits,
),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.compress_strictly_sorted(
initial,
decompressed,
compressed,
num_bits,
),
_ => self.bitpacker_1.compress_strictly_sorted(
initial,
decompressed,
compressed,
num_bits,
),
}
}
fn decompress_strictly_sorted(
&self,
initial: Option<u32>,
compressed: &[u8],
decompressed: &mut [u32],
num_bits: u8,
) -> usize {
match self.block_len {
BitPacker8x::BLOCK_LEN => self.bitpacker_8.decompress_strictly_sorted(
initial,
compressed,
decompressed,
num_bits,
),
BitPacker4x::BLOCK_LEN => self.bitpacker_4.decompress_strictly_sorted(
initial,
compressed,
decompressed,
num_bits,
),
_ => self.bitpacker_1.decompress_strictly_sorted(
initial,
compressed,
decompressed,
num_bits,
),
}
}
fn num_bits_strictly_sorted(&self, initial: Option<u32>, decompressed: &[u32]) -> u8 {
match self.block_len {
BitPacker8x::BLOCK_LEN => self
.bitpacker_8
.num_bits_strictly_sorted(initial, decompressed),
BitPacker4x::BLOCK_LEN => self
.bitpacker_4
.num_bits_strictly_sorted(initial, decompressed),
_ => self
.bitpacker_1
.num_bits_strictly_sorted(initial, decompressed),
}
}
}
#[cfg(test)]
mod tests {
use ahash::AHashMap;
use super::*;
#[test]
fn postings_roundtrip() {
for num_positions in [
1,
10,
BitPacker1x::BLOCK_LEN,
BitPacker4x::BLOCK_LEN,
BitPacker8x::BLOCK_LEN,
BitPacker8x::BLOCK_LEN + BitPacker4x::BLOCK_LEN + BitPacker1x::BLOCK_LEN,
BitPacker8x::BLOCK_LEN + BitPacker4x::BLOCK_LEN + BitPacker1x::BLOCK_LEN + 1,
(BitPacker8x::BLOCK_LEN * 3)
+ (BitPacker4x::BLOCK_LEN * 3)
+ (BitPacker1x::BLOCK_LEN * 3)
+ 1,
] {
println!("Testing block {num_positions}...",);
let mut postings = Postings::default();
for i in 0..num_positions {
postings.postings.push((i * i) as u32);
}
for fields in 0..std::cmp::min(10, num_positions) as u8 {
postings.fields.insert(fields);
}
let deserialized = SerializedPostings::new(postings.serialize());
let mut iter = (&deserialized).into_iter();
assert_eq!(
iter.items_left, num_positions,
"failed for num_positions: {}",
num_positions
);
for i in 0..num_positions {
assert_eq!(
iter.next(),
Some((i * i) as u32),
"failed for position: {}",
i
);
}
assert_eq!(iter.next(), None, "expected end of iterator");
for field in 0..std::cmp::min(10, num_positions) as u8 {
assert!(deserialized.has_field(field), "failed for field: {}", field);
}
assert_eq!(deserialized.positions().len(), num_positions);
}
}
#[test]
fn postings_match_positions() {
let mut maps: AHashMap<&str, Postings> = AHashMap::new();
let tokens = [
"the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog",
];
for (pos, word) in tokens.into_iter().enumerate() {
maps.entry(word).or_default().insert(0, pos as u32);
}
let maps: AHashMap<&str, SerializedPostings<Vec<u8>>> = maps
.into_iter()
.map(|(k, v)| (k, SerializedPostings::new(v.serialize())))
.collect();
let mut positions = Vec::new();
for (pos, word) in tokens.into_iter().enumerate() {
if pos > 0 {
assert!(maps[word].matches_positions(&positions, pos as u32));
} else {
positions = maps[word].positions();
}
}
}
}

View file

@ -22,32 +22,46 @@
*/
use std::{
borrow::Cow,
fmt::Display,
ops::{BitAndAssign, BitOrAssign, BitXorAssign},
};
use ahash::AHashSet;
use ahash::AHashMap;
use nlp::language::stemmer::Stemmer;
use roaring::RoaringBitmap;
use utils::codec::leb128::Leb128Reader;
use crate::{
backend::MAX_TOKEN_LENGTH,
fts::FtsFilter,
write::{BitmapClass, BitmapHash, ValueClass},
BitmapKey, Deserialize, Error, Store, ValueKey,
write::{
hash::TokenType, key::DeserializeBigEndian, BitmapHash, DynamicDocumentId, ValueClass,
},
BitmapKey, IterateParams, Store, ValueKey, U32_LEN,
};
use super::index::TERM_INDEX_VERSION;
use super::postings::SerializedPostings;
struct State<T: Into<u8> + Display + Clone + std::fmt::Debug> {
pub op: FtsFilter<T>,
struct State {
pub op: FtsTokenized,
pub bm: Option<RoaringBitmap>,
}
struct BigramIndex {
grams: Vec<[u8; 8]>,
enum FtsTokenized {
Exact {
tokens: Vec<(BitmapHash, u8)>,
},
Contains {
field: u8,
tokens: Vec<(BitmapHash, Option<BitmapHash>)>,
},
Keyword {
field: u8,
token: BitmapHash,
},
And,
Or,
Not,
End,
}
impl Store {
@ -58,107 +72,109 @@ impl Store {
filters: Vec<FtsFilter<T>>,
) -> crate::Result<RoaringBitmap> {
let collection = collection.into();
let mut not_mask = RoaringBitmap::new();
let mut not_fetch = false;
let mut state: State<T> = FtsFilter::And.into();
let mut stack = Vec::new();
let mut filters = filters.into_iter().peekable();
while let Some(filter) = filters.next() {
let mut result = match filter {
// Tokenize text
let mut tokenized_filters = Vec::with_capacity(filters.len());
let mut token_count = AHashMap::new();
for filter in filters {
let filter = match filter {
FtsFilter::Exact {
field,
text,
language,
} => {
let field: u8 = field.clone().into();
let mut keys = Vec::new();
let mut bigrams = AHashSet::new();
let mut last_token = Cow::Borrowed("");
let mut tokens = Vec::new();
let field = TokenType::word(field.into());
for token in language.tokenize_text(text.as_ref(), MAX_TOKEN_LENGTH) {
keys.push(BitmapKey {
account_id,
collection,
class: BitmapClass::word(token.word.as_ref(), field),
document_id: 0,
});
if !last_token.is_empty() {
bigrams.insert(
BitmapHash::new(&format!("{} {}", last_token, token.word)).hash,
);
}
last_token = token.word;
}
match keys.len().cmp(&1) {
std::cmp::Ordering::Less => None,
std::cmp::Ordering::Equal => self.get_bitmaps_intersection(keys).await?,
std::cmp::Ordering::Greater => {
if let Some(document_ids) = self.get_bitmaps_intersection(keys).await? {
let mut results = RoaringBitmap::new();
for document_id in document_ids {
if let Some(bigram_index) = self
.get_value::<BigramIndex>(ValueKey {
account_id,
collection,
document_id,
class: ValueClass::TermIndex,
})
.await?
{
if bigrams.iter().all(|bigram| {
bigram_index.grams.binary_search(bigram).is_ok()
}) {
results.insert(document_id);
}
}
}
if !results.is_empty() {
Some(results)
} else {
None
}
} else {
None
}
}
let hash = BitmapHash::new(token.word.as_ref());
token_count.entry(hash).and_modify(|c| *c += 1).or_insert(1);
tokens.push((hash, field));
}
FtsTokenized::Exact { tokens }
}
FtsFilter::Contains {
field,
text,
language,
} => {
let mut result = RoaringBitmap::new();
let field: u8 = field.clone().into();
let mut tokens = Vec::new();
for token in Stemmer::new(text.as_ref(), language, MAX_TOKEN_LENGTH) {
let token1 = BitmapKey {
account_id,
collection,
class: BitmapClass::word(token.word.as_ref(), field),
document_id: 0,
};
let token2 = BitmapKey {
account_id,
collection,
class: BitmapClass::stemmed(
if let Some(stemmed_word) = token.stemmed_word {
stemmed_word
} else {
token.word
}
.as_ref(),
field,
),
document_id: 0,
};
let hash = BitmapHash::new(token.word.as_ref());
let stemmed_hash = token.stemmed_word.as_deref().map(BitmapHash::new);
match self.get_bitmaps_union(vec![token1, token2]).await? {
token_count.entry(hash).and_modify(|c| *c += 1).or_insert(1);
if let Some(stemmed_hash) = stemmed_hash {
token_count
.entry(stemmed_hash)
.and_modify(|c| *c += 1)
.or_insert(1);
}
tokens.push((hash, stemmed_hash));
}
FtsTokenized::Contains {
field: field.into(),
tokens,
}
}
FtsFilter::Keyword { field, text } => {
let hash = BitmapHash::new(text);
token_count.entry(hash).and_modify(|c| *c += 1).or_insert(1);
FtsTokenized::Keyword {
field: field.into(),
token: hash,
}
}
FtsFilter::And => FtsTokenized::And,
FtsFilter::Or => FtsTokenized::Or,
FtsFilter::Not => FtsTokenized::Not,
FtsFilter::End => FtsTokenized::End,
};
tokenized_filters.push(filter);
}
let mut not_mask = RoaringBitmap::new();
let mut not_fetch = false;
let mut state: State = FtsTokenized::And.into();
let mut stack = Vec::new();
let mut token_cache = AHashMap::with_capacity(token_count.len());
let mut filters = tokenized_filters.into_iter().peekable();
while let Some(filter) = filters.next() {
let mut result = match filter {
FtsTokenized::Exact { tokens } => {
self.get_postings(
account_id,
collection,
&tokens,
&token_count,
&mut token_cache,
true,
)
.await?
}
FtsTokenized::Contains { field, tokens } => {
let mut result = RoaringBitmap::new();
for (token, stemmed_token) in tokens {
match self
.get_postings(
account_id,
collection,
&[
(token, TokenType::word(field)),
(stemmed_token.unwrap_or(token), TokenType::stemmed(field)),
],
&token_count,
&mut token_cache,
false,
)
.await?
{
Some(b) if !b.is_empty() => {
if !result.is_empty() {
result &= b;
@ -179,21 +195,23 @@ impl Store {
None
}
}
FtsFilter::Keyword { field, text } => {
self.get_bitmap(BitmapKey {
FtsTokenized::Keyword { field, token } => {
self.get_postings(
account_id,
collection,
class: BitmapClass::word(text, field),
document_id: 0,
})
&[(token, TokenType::word(field))],
&token_count,
&mut token_cache,
false,
)
.await?
}
op @ (FtsFilter::And | FtsFilter::Or | FtsFilter::Not) => {
op @ (FtsTokenized::And | FtsTokenized::Or | FtsTokenized::Not) => {
stack.push(state);
state = op.into();
continue;
}
FtsFilter::End => {
FtsTokenized::End => {
if let Some(prev_state) = stack.pop() {
let bm = state.bm;
state = prev_state;
@ -205,7 +223,7 @@ impl Store {
};
// Only fetch not mask if we need it
if matches!(state.op, FtsFilter::Not) && !not_fetch {
if matches!(state.op, FtsTokenized::Not) && !not_fetch {
not_mask = self
.get_bitmap(BitmapKey::document_ids(account_id, collection))
.await?
@ -216,19 +234,19 @@ impl Store {
// Apply logical operation
if let Some(dest) = &mut state.bm {
match state.op {
FtsFilter::And => {
FtsTokenized::And => {
if let Some(result) = result {
dest.bitand_assign(result);
} else {
dest.clear();
}
}
FtsFilter::Or => {
FtsTokenized::Or => {
if let Some(result) = result {
dest.bitor_assign(result);
}
}
FtsFilter::Not => {
FtsTokenized::Not => {
if let Some(mut result) = result {
result.bitxor_assign(&not_mask);
dest.bitand_assign(result);
@ -237,20 +255,20 @@ impl Store {
_ => unreachable!(),
}
} else if let Some(ref mut result_) = result {
if let FtsFilter::Not = state.op {
if let FtsTokenized::Not = state.op {
result_.bitxor_assign(&not_mask);
}
state.bm = result;
} else if let FtsFilter::Not = state.op {
} else if let FtsTokenized::Not = state.op {
state.bm = Some(not_mask.clone());
} else {
state.bm = Some(RoaringBitmap::new());
}
// And short circuit
if matches!(state.op, FtsFilter::And) && state.bm.as_ref().unwrap().is_empty() {
if matches!(state.op, FtsTokenized::And) && state.bm.as_ref().unwrap().is_empty() {
while let Some(filter) = filters.peek() {
if matches!(filter, FtsFilter::End) {
if matches!(filter, FtsTokenized::End) {
break;
} else {
filters.next();
@ -262,50 +280,148 @@ impl Store {
Ok(state.bm.unwrap_or_default())
}
async fn get_bitmaps_union(
async fn get_postings(
&self,
keys: Vec<BitmapKey<BitmapClass<u32>>>,
account_id: u32,
collection: u8,
tokens: &[(BitmapHash, u8)],
token_count: &AHashMap<BitmapHash, u32>,
token_cache: &mut AHashMap<BitmapHash, AHashMap<u32, SerializedPostings<Vec<u8>>>>,
is_intersect: bool,
) -> crate::Result<Option<RoaringBitmap>> {
let mut bm = RoaringBitmap::new();
let mut result_bm = RoaringBitmap::new();
let mut position_candidates = AHashMap::new();
let num_tokens = tokens.len();
for key in keys {
if let Some(items) = self.get_bitmap(key).await? {
bm.bitor_assign(items);
for (pos, (token, field)) in tokens.iter().enumerate() {
let needs_caching = token_count[token] > 1;
let is_first = pos == 0;
let mut bm = RoaringBitmap::new();
if needs_caching {
// Try to fetch from cache
if let Some(postings) = token_cache.get(token) {
for (document_id, postings) in postings {
if postings.has_field(*field) {
if is_intersect {
if is_first {
if num_tokens > 1 {
position_candidates
.insert(*document_id, postings.positions());
}
bm.insert(*document_id);
} else if position_candidates
.get(document_id)
.map_or(false, |positions| {
postings.matches_positions(positions, pos as u32)
})
{
bm.insert(*document_id);
}
} else {
result_bm.insert(*document_id);
}
}
}
if is_intersect {
if is_first {
result_bm = bm;
} else {
result_bm &= bm;
}
if result_bm.is_empty() {
return Ok(None);
}
}
continue;
}
// Insert empty cache entry
token_cache.insert(*token, AHashMap::new());
}
// Fetch from store
let key_len = ValueClass::FtsIndex::<DynamicDocumentId>(*token).serialized_size();
self.iterate(
IterateParams::new(
ValueKey {
account_id,
collection,
document_id: 0,
class: ValueClass::FtsIndex(*token),
},
ValueKey {
account_id,
collection,
document_id: u32::MAX,
class: ValueClass::FtsIndex(*token),
},
),
|key, value| {
if key.len() != key_len {
return Ok(true);
}
// Make sure this document contain the field
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
let postings = SerializedPostings::new(value);
if postings.has_field(*field) {
if is_intersect {
if is_first {
if num_tokens > 1 {
position_candidates.insert(document_id, postings.positions());
}
bm.insert(document_id);
} else if position_candidates
.get(&document_id)
.map_or(false, |positions| {
postings.matches_positions(positions, pos as u32)
})
{
bm.insert(document_id);
}
} else {
result_bm.insert(document_id);
}
}
// Cache the postings if needed
if needs_caching {
token_cache
.entry(*token)
.or_default()
.insert(document_id, SerializedPostings::new(value.to_vec()));
}
Ok(true)
},
)
.await?;
if is_intersect {
if is_first {
result_bm = bm;
} else {
result_bm &= bm;
}
if result_bm.is_empty() {
return Ok(None);
}
}
}
Ok(if !bm.is_empty() { Some(bm) } else { None })
Ok(if !result_bm.is_empty() {
Some(result_bm)
} else {
None
})
}
}
impl Deserialize for BigramIndex {
fn deserialize(bytes: &[u8]) -> crate::Result<Self> {
if bytes.first().copied().unwrap_or_default() != TERM_INDEX_VERSION {
return Err(Error::InternalError(
"Unsupported term index version".to_string(),
));
}
let bytes = lz4_flex::decompress_size_prepended(bytes.get(1..).unwrap_or_default())
.map_err(|_| Error::InternalError("Failed to decompress term index".to_string()))?;
let (num_items, pos) = bytes.read_leb128::<usize>().ok_or(Error::InternalError(
"Failed to read term index marker".to_string(),
))?;
bytes
.get(pos..pos + (num_items * 8))
.map(|bytes| Self {
grams: bytes
.chunks_exact(8)
.map(|chunk| chunk.try_into().unwrap())
.collect(),
})
.ok_or_else(|| Error::InternalError("Failed to read term index".to_string()))
}
}
impl<T: Into<u8> + Display + Clone + std::fmt::Debug> From<FtsFilter<T>> for State<T> {
fn from(value: FtsFilter<T>) -> Self {
impl From<FtsTokenized> for State {
fn from(value: FtsTokenized) -> Self {
Self {
op: value,
bm: None,

View file

@ -173,7 +173,7 @@ pub const SUBSPACE_BITMAP_ID: u8 = b'b';
pub const SUBSPACE_BITMAP_TAG: u8 = b'c';
pub const SUBSPACE_BITMAP_TEXT: u8 = b'v';
pub const SUBSPACE_DIRECTORY: u8 = b'd';
pub const SUBSPACE_FTS_INDEX: u8 = b'f';
pub const SUBSPACE_FTS_QUEUE: u8 = b'f';
pub const SUBSPACE_INDEXES: u8 = b'i';
pub const SUBSPACE_BLOB_RESERVE: u8 = b'j';
pub const SUBSPACE_BLOB_LINK: u8 = b'k';
@ -188,7 +188,7 @@ pub const SUBSPACE_QUEUE_EVENT: u8 = b'q';
pub const SUBSPACE_QUOTA: u8 = b'u';
pub const SUBSPACE_REPORT_OUT: u8 = b'h';
pub const SUBSPACE_REPORT_IN: u8 = b'r';
pub const SUBSPACE_TERM_INDEX: u8 = b'g';
pub const SUBSPACE_FTS_INDEX: u8 = b'g';
pub const SUBSPACE_RESERVED_1: u8 = b'o';
pub const SUBSPACE_RESERVED_2: u8 = b'w';

View file

@ -1,224 +0,0 @@
/*
* Copyright (c) 2023 Stalwart Labs Ltd.
*
* This file is part of the Stalwart Mail Server.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* in the LICENSE file at the top-level directory of this distribution.
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* You can be released from the requirements of the AGPLv3 license by
* purchasing a commercial license. Please contact licensing@stalw.art
* for more details.
*/
use ahash::AHashSet;
use roaring::RoaringBitmap;
pub const WORD_SIZE_BITS: u32 = (WORD_SIZE * 8) as u32;
pub const WORD_SIZE: usize = std::mem::size_of::<u128>();
pub const WORDS_PER_BLOCK: u32 = 8;
pub const BITS_PER_BLOCK: u32 = WORD_SIZE_BITS * WORDS_PER_BLOCK;
pub const BITS_MASK: u32 = BITS_PER_BLOCK - 1;
pub const BITMAP_SIZE: usize = WORD_SIZE * WORDS_PER_BLOCK as usize;
pub struct DenseBitmap {
pub bitmap: [u8; BITMAP_SIZE],
}
impl DenseBitmap {
pub fn empty() -> Self {
Self {
bitmap: [0; BITMAP_SIZE],
}
}
pub fn full() -> Self {
Self {
bitmap: [u8::MAX; BITMAP_SIZE],
}
}
pub fn set(&mut self, index: u32) {
let index = index & BITS_MASK;
self.bitmap[(index / 8) as usize] |= 1 << (index & 7);
}
pub fn clear(&mut self, index: u32) {
let index = index & BITS_MASK;
self.bitmap[(index / 8) as usize] &= !(1 << (index & 7));
}
#[inline(always)]
pub fn block_num(index: u32) -> u32 {
index / BITS_PER_BLOCK
}
#[inline(always)]
pub fn block_index(index: u32) -> u32 {
index & BITS_MASK
}
}
pub trait DeserializeBlock {
fn deserialize_block(&mut self, bytes: &[u8], block_num: u32);
fn deserialize_word(&mut self, word: &[u8], block_num: u32, word_num: u32);
}
pub fn next_available_index(
bytes: &[u8],
block_num: u32,
reserved_ids: &AHashSet<u32>,
) -> Option<u32> {
'outer: for (byte_pos, byte) in bytes.iter().enumerate() {
if *byte != u8::MAX {
let mut index = 0;
loop {
while (byte >> index) & 1 == 1 {
index += 1;
if index == 8 {
continue 'outer;
}
}
let id = (block_num * BITS_PER_BLOCK) + ((byte_pos * 8) + index) as u32;
if !reserved_ids.contains(&id) {
return Some(id);
} else if index < 7 {
index += 1;
continue;
} else {
continue 'outer;
}
}
}
}
None
}
pub fn block_contains(bytes: &[u8], block_num: u32, document_id: u32) -> bool {
'outer: for (byte_pos, byte) in bytes.iter().enumerate() {
if *byte != 0 {
let mut index = 0;
loop {
while (byte >> index) & 1 == 0 {
index += 1;
if index == 8 {
continue 'outer;
}
}
let id = (block_num * BITS_PER_BLOCK) + ((byte_pos * 8) + index) as u32;
if id == document_id {
return true;
} else if index < 7 {
index += 1;
continue;
} else {
continue 'outer;
}
}
}
}
false
}
impl DeserializeBlock for RoaringBitmap {
fn deserialize_block(&mut self, bytes: &[u8], block_num: u32) {
debug_assert_eq!(bytes.len(), BITMAP_SIZE);
self.deserialize_word(&bytes[..WORD_SIZE], block_num, 0);
self.deserialize_word(&bytes[WORD_SIZE..WORD_SIZE * 2], block_num, 1);
self.deserialize_word(&bytes[WORD_SIZE * 2..WORD_SIZE * 3], block_num, 2);
self.deserialize_word(&bytes[WORD_SIZE * 3..WORD_SIZE * 4], block_num, 3);
self.deserialize_word(&bytes[WORD_SIZE * 4..WORD_SIZE * 5], block_num, 4);
self.deserialize_word(&bytes[WORD_SIZE * 5..WORD_SIZE * 6], block_num, 5);
self.deserialize_word(&bytes[WORD_SIZE * 6..WORD_SIZE * 7], block_num, 6);
self.deserialize_word(&bytes[WORD_SIZE * 7..], block_num, 7);
}
#[inline(always)]
fn deserialize_word(&mut self, word: &[u8], block_num: u32, word_num: u32) {
match u128::from_le_bytes(word.try_into().unwrap()) {
0 => (),
u128::MAX => {
self.insert_range(
block_num * BITS_PER_BLOCK + word_num * WORD_SIZE_BITS
..(block_num * BITS_PER_BLOCK + word_num * WORD_SIZE_BITS) + WORD_SIZE_BITS,
);
}
mut word => {
while word != 0 {
let trailing_zeros = word.trailing_zeros();
self.insert(
block_num * BITS_PER_BLOCK + word_num * WORD_SIZE_BITS + trailing_zeros,
);
word ^= 1 << trailing_zeros;
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use ahash::AHashSet;
use roaring::RoaringBitmap;
use super::*;
#[test]
fn serialize_bitmap_block() {
for range in [(0..128), (128..256), (5076..5093), (1762..19342)] {
let mut blocks = HashMap::new();
let mut bitmap = RoaringBitmap::new();
for item in range {
bitmap.insert(item);
blocks
.entry(item / BITS_PER_BLOCK)
.or_insert_with(DenseBitmap::empty)
.set(item);
}
let mut bitmap_blocks = RoaringBitmap::new();
for (block_num, dense_bitmap) in blocks {
bitmap_blocks.deserialize_block(&dense_bitmap.bitmap, block_num);
}
assert_eq!(bitmap, bitmap_blocks);
}
}
#[test]
fn get_next_available_index() {
let eh = AHashSet::new();
let mut uh = AHashSet::new();
let mut bm = DenseBitmap::empty();
for id in 0..1024 {
uh.insert(id);
assert_eq!(
next_available_index(&bm.bitmap, 0, &eh),
Some(id),
"failed for {id}"
);
assert_eq!(
next_available_index(&bm.bitmap, 0, &uh),
if id < 1023 { Some(id + 1) } else { None },
"reserved id failed for {id}"
);
bm.set(id);
}
}
}

View file

@ -39,13 +39,6 @@ impl<T> BitmapClass<T> {
token: BitmapHash::new(token),
}
}
/*pub fn bigram(token: impl AsRef<[u8]>, field: impl Into<u8>) -> Self {
BitmapClass::Text {
field: field.into() | 1 << 7,
token: BitmapHash::new(token),
}
}*/
}
impl BitmapHash {
@ -86,8 +79,4 @@ impl TokenType {
pub fn stemmed(field: u8) -> u8 {
1 << 7 | field
}
/*pub fn bigram(field: u8) -> u8 {
1 << 7 | field
}*/
}

View file

@ -28,13 +28,13 @@ use crate::{
BitmapKey, Deserialize, IndexKey, IndexKeyPrefix, Key, LogKey, ValueKey, SUBSPACE_ACL,
SUBSPACE_BITMAP_ID, SUBSPACE_BITMAP_TAG, SUBSPACE_BITMAP_TEXT, SUBSPACE_BLOB_LINK,
SUBSPACE_BLOB_RESERVE, SUBSPACE_COUNTER, SUBSPACE_DIRECTORY, SUBSPACE_FTS_INDEX,
SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY,
SUBSPACE_FTS_QUEUE, SUBSPACE_INDEXES, SUBSPACE_LOGS, SUBSPACE_LOOKUP_VALUE, SUBSPACE_PROPERTY,
SUBSPACE_QUEUE_EVENT, SUBSPACE_QUEUE_MESSAGE, SUBSPACE_QUOTA, SUBSPACE_REPORT_OUT,
SUBSPACE_SETTINGS, SUBSPACE_TERM_INDEX, U32_LEN, U64_LEN, WITH_SUBSPACE,
SUBSPACE_SETTINGS, U32_LEN, U64_LEN, WITH_SUBSPACE,
};
use super::{
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, IndexEmailClass, LookupClass,
AnyKey, AssignedIds, BitmapClass, BlobOp, DirectoryClass, FtsQueueClass, LookupClass,
QueueClass, ReportClass, ReportEvent, ResolveId, TagValue, ValueClass,
};
@ -262,24 +262,38 @@ impl<T: ResolveId> ValueClass<T> {
.write(collection)
.write(*field)
.write(document_id),
ValueClass::TermIndex => serializer
.write(account_id)
ValueClass::FtsIndex(hash) => {
let serializer = serializer.write(account_id).write(
hash.hash
.get(0..std::cmp::min(hash.len as usize, 8))
.unwrap(),
);
if hash.len >= 8 {
serializer.write(hash.len)
} else {
serializer
}
.write(collection)
.write_leb128(document_id),
.write(document_id)
}
ValueClass::Acl(grant_account_id) => serializer
.write(*grant_account_id)
.write(account_id)
.write(collection)
.write(document_id),
ValueClass::IndexEmail(index) => match index {
IndexEmailClass::Insert { seq, hash } => serializer
ValueClass::FtsQueue(index) => match index {
FtsQueueClass::Insert { seq, hash } => serializer
.write(*seq)
.write(account_id)
.write(collection)
.write(document_id)
.write::<&[u8]>(hash.as_ref()),
IndexEmailClass::Delete { seq } => {
serializer.write(*seq).write(account_id).write(document_id)
}
FtsQueueClass::Delete { seq } => serializer
.write(*seq)
.write(account_id)
.write(collection)
.write(document_id),
},
ValueClass::Blob(op) => match op {
BlobOp::Reserve { hash, until } => serializer
@ -515,7 +529,14 @@ impl<T: AsRef<[u8]> + Sync + Send> Key for AnyKey<T> {
impl<T> ValueClass<T> {
pub fn serialized_size(&self) -> usize {
match self {
ValueClass::Property(_) | ValueClass::TermIndex => U32_LEN * 2 + 3,
ValueClass::Property(_) => U32_LEN * 2 + 3,
ValueClass::FtsIndex(hash) => {
if hash.len >= 8 {
U32_LEN * 2 + 10
} else {
hash.len as usize + U32_LEN * 2 + 1
}
}
ValueClass::Acl(_) => U32_LEN * 3 + 2,
ValueClass::Lookup(LookupClass::Counter(v) | LookupClass::Key(v))
| ValueClass::Config(v) => v.len(),
@ -532,7 +553,7 @@ impl<T> ValueClass<T> {
BLOB_HASH_LEN + U32_LEN * 2 + 2
}
},
ValueClass::IndexEmail { .. } => BLOB_HASH_LEN + U64_LEN * 2,
ValueClass::FtsQueue { .. } => BLOB_HASH_LEN + U64_LEN * 2,
ValueClass::Queue(q) => match q {
QueueClass::Message(_) => U64_LEN,
QueueClass::MessageEvent(_) => U64_LEN * 2,
@ -557,9 +578,9 @@ impl<T> ValueClass<T> {
SUBSPACE_PROPERTY
}
}
ValueClass::TermIndex => SUBSPACE_TERM_INDEX,
ValueClass::Acl(_) => SUBSPACE_ACL,
ValueClass::IndexEmail { .. } => SUBSPACE_FTS_INDEX,
ValueClass::FtsIndex(_) => SUBSPACE_FTS_INDEX,
ValueClass::FtsQueue { .. } => SUBSPACE_FTS_QUEUE,
ValueClass::Blob(op) => match op {
BlobOp::Reserve { .. } => SUBSPACE_BLOB_RESERVE,
BlobOp::Commit { .. } | BlobOp::Link { .. } | BlobOp::LinkId { .. } => {

View file

@ -44,7 +44,6 @@ use self::assert::AssertValue;
pub mod assert;
pub mod batch;
pub mod bitmap;
pub mod blob;
pub mod hash;
pub mod key;
@ -164,17 +163,17 @@ pub enum ValueClass<T> {
Property(u8),
Acl(u32),
Lookup(LookupClass),
TermIndex,
FtsIndex(BitmapHash),
FtsQueue(FtsQueueClass),
Directory(DirectoryClass<T>),
Blob(BlobOp),
IndexEmail(IndexEmailClass),
Config(Vec<u8>),
Queue(QueueClass),
Report(ReportClass),
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub enum IndexEmailClass {
pub enum FtsQueueClass {
Insert { seq: u64, hash: BlobHash },
Delete { seq: u64 },
}

View file

@ -5,8 +5,8 @@ edition = "2021"
resolver = "2"
[features]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "foundationdb"]
default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis"]
#default = ["sqlite", "postgres", "mysql", "rocks", "elastic", "s3", "redis", "foundationdb"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation"]
postgres = ["store/postgres"]

View file

@ -92,8 +92,11 @@ pub async fn test(db: Store) {
batch.set(ValueClass::Property(idx as u8), random_bytes(value_size));
}
for value_size in [16, 128, 1024, 2056, 102400] {
batch.set(ValueClass::TermIndex, random_bytes(value_size));
for value_size in [1, 4, 7, 8, 9, 16] {
batch.set(
ValueClass::FtsIndex(BitmapHash::new(random_bytes(value_size))),
random_bytes(value_size * 2),
);
}
for grant_account_id in 0u32..10u32 {
@ -302,7 +305,7 @@ impl Snapshot {
(SUBSPACE_BITMAP_TAG, false),
(SUBSPACE_BITMAP_TEXT, false),
(SUBSPACE_DIRECTORY, true),
(SUBSPACE_FTS_INDEX, true),
(SUBSPACE_FTS_QUEUE, true),
(SUBSPACE_INDEXES, false),
(SUBSPACE_BLOB_RESERVE, true),
(SUBSPACE_BLOB_LINK, true),
@ -317,7 +320,7 @@ impl Snapshot {
(SUBSPACE_QUOTA, !is_sql),
(SUBSPACE_REPORT_OUT, true),
(SUBSPACE_REPORT_IN, true),
(SUBSPACE_TERM_INDEX, true),
(SUBSPACE_FTS_INDEX, true),
] {
let from_key = AnyKey {
subspace,