CLI import/export tested.

This commit is contained in:
mdecimus 2023-07-06 17:28:45 +02:00
parent 1f4204c6bf
commit fac3273f10
14 changed files with 397 additions and 130 deletions

View file

@ -31,6 +31,7 @@ use console::style;
use jmap_client::client::{Client, Credentials};
use modules::{
cli::{Cli, Commands},
database::cmd_database,
export::cmd_export,
get,
import::cmd_import,
@ -64,19 +65,14 @@ async fn main() -> std::io::Result<()> {
};
if is_jmap {
let client = Client::new()
.credentials(credentials)
.accept_invalid_certs(is_localhost(&args.url))
.connect(&args.url)
.await
.unwrap_or_else(|err| {
eprintln!("Failed to connect to JMAP server {}: {}.", args.url, err);
std::process::exit(1);
});
match args.command {
Commands::Import(command) => cmd_import(client, command).await,
Commands::Export(command) => cmd_export(client, command).await,
Commands::Import(command) => {
cmd_import(build_client(&args.url, credentials).await, command).await
}
Commands::Export(command) => {
cmd_export(build_client(&args.url, credentials).await, command).await
}
Commands::Database(command) => cmd_database(&args.url, credentials, command).await,
Commands::Queue(_) | Commands::Report(_) => unreachable!(),
}
} else {
@ -90,6 +86,18 @@ async fn main() -> std::io::Result<()> {
Ok(())
}
async fn build_client(url: &str, credentials: Credentials) -> Client {
Client::new()
.credentials(credentials)
.accept_invalid_certs(is_localhost(url))
.connect(url)
.await
.unwrap_or_else(|err| {
eprintln!("Failed to connect to JMAP server {}: {}.", url, err);
std::process::exit(1);
})
}
fn parse_credentials(credentials: &str) -> Credentials {
if let Some((account, secret)) = credentials.split_once(':') {
Credentials::basic(account, secret)

View file

@ -49,6 +49,10 @@ pub enum Commands {
#[clap(subcommand)]
Export(ExportCommands),
/// Manage JMAP database
#[clap(subcommand)]
Database(DatabaseCommands),
/// Manage SMTP message queue
#[clap(subcommand)]
Queue(QueueCommands),
@ -106,6 +110,26 @@ pub enum ExportCommands {
},
}
#[derive(Subcommand)]
pub enum DatabaseCommands {
/// Delete a JMAP account
Delete {
/// Account name to delete
account: String,
},
/// Rename a JMAP account
Rename {
/// Account name to rename
account: String,
/// New account name
new_account: String,
},
/// Purge expired blobs
Purge {},
}
#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum MailboxFormat {
/// Mbox format

View file

@ -0,0 +1,39 @@
use jmap_client::client::Credentials;
use reqwest::header::AUTHORIZATION;
use super::{cli::DatabaseCommands, is_localhost, UnwrapResult};
pub async fn cmd_database(url: &str, credentials: Credentials, command: DatabaseCommands) {
let url = match command {
DatabaseCommands::Delete { account } => format!("{}/admin/account/delete/{}", url, account),
DatabaseCommands::Rename {
account,
new_account,
} => format!("{}/admin/account/rename/{}/{}", url, account, new_account),
DatabaseCommands::Purge {} => format!("{}/admin/blob/purge", url),
};
let response = reqwest::Client::builder()
.danger_accept_invalid_certs(is_localhost(&url))
.build()
.unwrap_or_default()
.get(url)
.header(
AUTHORIZATION,
match credentials {
Credentials::Basic(s) => format!("Basic {s}"),
Credentials::Bearer(s) => format!("Bearer {s}"),
},
)
.send()
.await
.unwrap_result("send GET request");
if response.status().is_success() {
eprintln!("Success.");
} else {
eprintln!(
"Request Failed: {}",
response.text().await.unwrap_result("fetch text")
);
}
}

View file

@ -460,7 +460,7 @@ async fn import_mailboxes(client: &Client, path: &Path) -> HashMap<String, Strin
if !matches!(mailbox.role(), Role::None) {
if let Some(existing_mailbox) = existing_mailboxes
.iter()
.find(|m| m.role() != mailbox.role())
.find(|m| m.role() == mailbox.role())
{
id_mappings.insert(
id.to_string(),
@ -484,6 +484,7 @@ async fn import_mailboxes(client: &Client, path: &Path) -> HashMap<String, Strin
}
}
let mut total_imported = 0;
let mut total_existing = 0;
if !id_missing.is_empty() {
let mut request = client.build();
let set_request = request.set_mailbox();
@ -492,6 +493,7 @@ async fn import_mailboxes(client: &Client, path: &Path) -> HashMap<String, Strin
// Skip if mailbox already exists
let id = mailbox.id().unwrap_result("obtain mailbox id").to_string();
if id_mappings.contains_key(&id) {
total_existing += 1;
continue;
}
let create_request = set_request
@ -533,9 +535,16 @@ async fn import_mailboxes(client: &Client, path: &Path) -> HashMap<String, Strin
);
total_imported += 1;
}
} else {
total_existing = mailboxes.len();
}
eprintln!("Successfully imported {} mailboxes.", total_imported);
eprintln!(
"Successfully processed {} mailboxes ({} imported, {} already exist).",
total_existing + total_imported,
total_imported,
total_existing
);
id_mappings
}
@ -564,19 +573,19 @@ async fn import_emails(
.await;
let existing_ids = existing_emails
.iter()
.filter_map(|email| email.message_id())
.map(|email| (email.message_id(), email.received_at()))
.collect::<HashSet<_>>();
let mut futures = FuturesUnordered::new();
let total_imported = Arc::new(AtomicUsize::from(0));
let mut total_existing = 0;
let mut path = PathBuf::from(path);
path.push("blobs");
for email in emails {
// Skip messages that already exist in the server
if let Some(message_ids) = email.message_id() {
if existing_ids.contains(message_ids) {
continue;
}
if existing_ids.contains(&(email.message_id(), email.received_at())) {
total_existing += 1;
continue;
}
// Spawn import tasks
@ -668,8 +677,10 @@ async fn import_emails(
// Done
eprintln!(
"Successfully imported {} messages.",
total_imported.load(Ordering::Relaxed)
"Successfully processed {} emails ({} imported, {} already exist).",
total_imported.load(Ordering::Relaxed) + total_existing,
total_imported.load(Ordering::Relaxed),
total_existing
);
}
@ -694,11 +705,13 @@ async fn import_sieve_scripts(client: &Client, path: &Path, num_concurrent: usiz
// Spawn tasks
let mut futures = FuturesUnordered::new();
let total_imported = Arc::new(AtomicUsize::from(0));
let mut total_existing = 0;
'outer: for script in scripts {
// Skip scripts that already exist
for existing_script in &existing_scripts {
if existing_script.name() == script.name() {
total_existing += 1;
continue 'outer;
}
}
@ -770,8 +783,10 @@ async fn import_sieve_scripts(client: &Client, path: &Path, num_concurrent: usiz
// Done
eprintln!(
"Successfully imported {} sieve script.",
total_imported.load(Ordering::Relaxed)
"Successfully processed {} sieve scripts ({} imported, {} already exist).",
total_imported.load(Ordering::Relaxed) + total_existing,
total_imported.load(Ordering::Relaxed),
total_existing
);
}
@ -785,12 +800,14 @@ async fn import_identities(client: &Client, path: &Path) {
let mut request = client.build();
let set_request = request.set_identity();
let mut create_ids = Vec::new();
let mut total_existing = 0;
'outer: for identity in &identities {
for existing_identity in &existing_identities {
if identity.name() == existing_identity.name()
&& identity.email() == existing_identity.email()
{
total_existing += 1;
continue 'outer;
}
}
@ -798,22 +815,21 @@ async fn import_identities(client: &Client, path: &Path) {
if let (Some(id), Some(name), Some(email)) =
(identity.id(), identity.name(), identity.email())
{
if name == "vacation" {
continue;
}
create_ids.push(id);
let create_request = set_request.create_with_id(id).name(name).email(email);
if let Some(reply_to) = identity.reply_to() {
create_request.reply_to(reply_to.iter().cloned().into());
}
if let Some(bcc) = identity.bcc() {
create_request.bcc(bcc.iter().cloned().into());
}
if let Some(html_signature) = identity.html_signature() {
create_request.html_signature(html_signature);
}
if let Some(text_signature) = identity.text_signature() {
create_request.text_signature(text_signature);
if name != "vacation" {
create_ids.push(id);
let create_request = set_request.create_with_id(id).name(name).email(email);
if let Some(reply_to) = identity.reply_to() {
create_request.reply_to(reply_to.iter().cloned().into());
}
if let Some(bcc) = identity.bcc() {
create_request.bcc(bcc.iter().cloned().into());
}
if let Some(html_signature) = identity.html_signature() {
create_request.html_signature(html_signature);
}
if let Some(text_signature) = identity.text_signature() {
create_request.text_signature(text_signature);
}
}
} else {
eprintln!("Skipping identity with no id, name, and/or email.");
@ -821,23 +837,31 @@ async fn import_identities(client: &Client, path: &Path) {
}
}
match request.send_set_identity().await {
Ok(mut response) => {
let mut total_imported = 0;
for id in create_ids {
if let Err(err) = response.created(&id) {
eprintln!("Failed to import identity {id}: {err}");
} else {
total_imported += 1;
let mut total_imported = 0;
if !create_ids.is_empty() {
match request.send_set_identity().await {
Ok(mut response) => {
for id in create_ids {
if let Err(err) = response.created(&id) {
eprintln!("Failed to import identity {id}: {err}");
} else {
total_imported += 1;
}
}
}
eprintln!("Successfully imported {} identities.", total_imported);
}
Err(err) => {
eprintln!("Failed to import identities: {err}");
Err(err) => {
eprintln!("Failed to import identities: {err}");
return;
}
}
}
eprintln!(
"Successfully processed {} identities ({} imported, {} already exist).",
total_imported + total_existing,
total_imported,
total_existing
);
}
async fn import_vacation_responses(client: &Client, path: &Path) {
@ -849,6 +873,7 @@ async fn import_vacation_responses(client: &Client, path: &Path) {
}
let existing_vacation_responses = fetch_vacation_responses(client).await;
if !existing_vacation_responses.is_empty() {
eprintln!("Successfully processed 1 vacation response (0 imported, 1 already exist).",);
return;
}
@ -881,7 +906,9 @@ async fn import_vacation_responses(client: &Client, path: &Path) {
if let Err(err) = response.created(&create_id) {
eprintln!("Failed to import vacation response: {err}");
} else {
eprintln!("Successfully imported 1 vacation response.");
eprintln!(
"Successfully processed 1 vacation response (1 imported, 0 already exist).",
);
}
}
Err(err) => {
@ -906,15 +933,17 @@ fn build_mailbox_tree(
'outer: loop {
while let Some(mailbox) = mailboxes_iter.next() {
if parent_id == mailbox.parent_id() {
let name = mailbox.name().unwrap_result("obtain mailbox name");
if parents.contains(&mailbox.id()) {
stack.push((path.clone(), parent_id, mailboxes_iter));
parent_id = mailbox.id();
path.push(mailbox.name().unwrap_result("obtain mailbox name"));
path.push(name);
results.insert(path.clone(), mailbox);
mailboxes_iter = mailboxes.iter();
continue 'outer;
} else {
let mut path = path.clone();
path.push(mailbox.name().unwrap_result("obtain mailbox name"));
path.push(name);
results.insert(path, mailbox);
}
}
@ -927,6 +956,7 @@ fn build_mailbox_tree(
break;
}
}
debug_assert_eq!(results.len(), mailboxes.len());
results
}

View file

@ -32,6 +32,7 @@ use jmap_client::{
};
pub mod cli;
pub mod database;
pub mod export;
pub mod import;
pub mod queue;

View file

@ -26,20 +26,34 @@ use jmap_proto::{
types::{collection::Collection, property::Property, value::Value},
};
use store::{
write::{assert::HashedValue, BatchBuilder},
BitmapKey, ValueKey,
write::{assert::HashedValue, BatchBuilder, Operation, ValueClass},
BitmapKey, Serialize, ValueKey,
};
use crate::{mailbox::set::SCHEMA, JMAP};
use crate::{auth::authenticate::AccountKey, mailbox::set::SCHEMA, JMAP};
impl JMAP {
pub async fn delete_account(&self, account_id: u32) -> store::Result<()> {
pub async fn delete_account(&self, account_name: &str, account_id: u32) -> store::Result<()> {
// Delete blobs
self.store.delete_account_blobs(account_id).await?;
// Delete mailboxes
let mut batch = BatchBuilder::new();
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(account_name),
},
set: None,
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::id_to_name(account_id),
},
set: None,
})
.with_account_id(account_id)
.with_collection(Collection::Mailbox);
for mailbox_id in self
@ -73,4 +87,40 @@ impl JMAP {
Ok(())
}
pub async fn rename_account(
&self,
new_account_name: &str,
account_name: &str,
account_id: u32,
) -> store::Result<()> {
// Delete blobs
self.store.delete_account_blobs(account_id).await?;
// Delete mailboxes
let mut batch = BatchBuilder::new();
batch
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(account_name),
},
set: None,
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::name_to_id(new_account_name),
},
set: account_id.serialize().into(),
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: AccountKey::id_to_name(account_id),
},
set: new_account_name.serialize().into(),
});
self.store.write(batch.build()).await?;
Ok(())
}
}

View file

@ -45,7 +45,7 @@ use tokio::{
use utils::listener::{ServerInstance, SessionData, SessionManager};
use crate::{
auth::oauth::OAuthMetadata,
auth::{oauth::OAuthMetadata, AccessToken},
blob::{DownloadResponse, UploadResponse},
services::state,
websocket::upgrade::upgrade_websocket_connection,
@ -77,7 +77,7 @@ pub async fn parse_jmap_request(
match (path.next().unwrap_or(""), req.method()) {
("", &Method::POST) => {
return match fetch_body(&mut req, jmap.config.request_max_size)
return match fetch_body(&mut req, jmap.config.request_max_size, &access_token)
.await
.ok_or_else(|| RequestError::limit(RequestLimitError::SizeRequest))
.and_then(|bytes| {
@ -127,7 +127,13 @@ pub async fn parse_jmap_request(
("upload", &Method::POST) => {
if let Some(account_id) = path.next().and_then(|p| Id::from_bytes(p.as_bytes()))
{
return match fetch_body(&mut req, jmap.config.upload_max_size).await {
return match fetch_body(
&mut req,
jmap.config.upload_max_size,
&access_token,
)
.await
{
Some(bytes) => {
match jmap
.blob_upload(
@ -245,23 +251,77 @@ pub async fn parse_jmap_request(
req.method(),
) {
("account", "delete", &Method::GET) => {
return if let Some(account_id) = path.next().and_then(|s| s.parse::<u32>().ok())
{
match jmap.delete_account(account_id).await {
Ok(_) => JsonResponse::new(Value::String("success".into()))
return if let Some(account_name) = path.next() {
if let Ok(Some(account_id)) = jmap.try_get_account_id(account_name).await {
match jmap.delete_account(account_name, account_id).await {
Ok(_) => JsonResponse::new(Value::String("success".into()))
.into_http_response(),
Err(err) => RequestError::blank(
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
"Account deletion failed",
err.to_string(),
)
.into_http_response(),
Err(err) => RequestError::blank(
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
"Account deletion failed",
err.to_string(),
}
} else {
RequestError::blank(
StatusCode::NOT_FOUND.as_u16(),
"Not found",
"Account not found.",
)
.into_http_response(),
.into_http_response()
}
} else {
RequestError::blank(
StatusCode::BAD_REQUEST.as_u16(),
"Invalid parameters",
"Expected account id",
"Expected account name",
)
.into_http_response()
};
}
("account", "rename", &Method::GET) => {
return if let (Some(account_name), Some(new_account_name)) =
(path.next(), path.next())
{
match (
jmap.try_get_account_id(account_name).await,
jmap.try_get_account_id(new_account_name).await,
) {
(Ok(Some(account_id)), Ok(None)) => {
match jmap
.rename_account(new_account_name, account_name, account_id)
.await
{
Ok(_) => JsonResponse::new(Value::String("success".into()))
.into_http_response(),
Err(err) => RequestError::blank(
StatusCode::INTERNAL_SERVER_ERROR.as_u16(),
"Account rename failed",
err.to_string(),
)
.into_http_response(),
}
}
(Ok(None), _) => RequestError::blank(
StatusCode::NOT_FOUND.as_u16(),
"Not found",
"Account not found.",
)
.into_http_response(),
(_, Ok(Some(_))) => RequestError::blank(
StatusCode::BAD_REQUEST.as_u16(),
"Invalid parameters",
"New account name already exists.",
)
.into_http_response(),
_ => RequestError::internal_server_error().into_http_response(),
}
} else {
RequestError::blank(
StatusCode::BAD_REQUEST.as_u16(),
"Invalid parameters",
"Expected old and new account names",
)
.into_http_response()
};
@ -374,11 +434,16 @@ async fn handle_request<T: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
}
}
pub async fn fetch_body(req: &mut HttpRequest, max_size: usize) -> Option<Vec<u8>> {
pub async fn fetch_body(
req: &mut HttpRequest,
max_size: usize,
access_token: &AccessToken,
) -> Option<Vec<u8>> {
let mut bytes = Vec::with_capacity(1024);
while let Some(Ok(frame)) = req.frame().await {
if let Some(data) = frame.data_ref() {
if bytes.len() + data.len() <= max_size {
if bytes.len() + data.len() <= max_size || max_size == 0 || access_token.is_super_user()
{
bytes.extend_from_slice(data);
} else {
return None;

View file

@ -153,32 +153,29 @@ impl JMAP {
}
}
pub async fn try_get_account_id(&self, name: &str) -> Result<Option<u32>, MethodError> {
self.store
.get_value::<u32>(CustomValueKey {
value: AccountKey::name_to_id(name),
})
.await
.map_err(|err| {
tracing::error!(event = "error",
context = "store",
account_name = name,
error = ?err,
"Failed to retrieve account id");
MethodError::ServerPartialFail
})
}
pub async fn get_account_id(&self, name: &str) -> Result<u32, MethodError> {
let mut try_count = 0;
loop {
// Try to obtain ID
match self
.store
.get_value::<u32>(CustomValueKey {
value: KeySerializer::new(name.len() + std::mem::size_of::<u32>() + 1)
.write(u32::MAX)
.write(0u8)
.write(name)
.finalize(),
})
.await
{
Ok(Some(id)) => return Ok(id),
Ok(None) => {}
Err(err) => {
tracing::error!(event = "error",
context = "store",
account_name = name,
error = ?err,
"Failed to retrieve account id");
return Err(MethodError::ServerPartialFail);
}
if let Some(account_id) = self.try_get_account_id(name).await? {
return Ok(account_id);
}
// Assign new ID
@ -187,11 +184,7 @@ impl JMAP {
.await?;
// Serialize key
let key = KeySerializer::new(name.len() + std::mem::size_of::<u32>() + 1)
.write(u32::MAX)
.write(0u8)
.write(name)
.finalize();
let key = AccountKey::name_to_id(name);
// Write account ID
let mut batch = BatchBuilder::new();
@ -206,11 +199,7 @@ impl JMAP {
})
.op(Operation::Value {
class: ValueClass::Custom {
bytes: KeySerializer::new(std::mem::size_of::<u32>() * 2 + 1)
.write(u32::MAX)
.write(1u8)
.write(account_id)
.finalize(),
bytes: AccountKey::id_to_name(account_id),
},
set: name.serialize().into(),
});
@ -249,11 +238,7 @@ impl JMAP {
pub async fn get_account_name(&self, account_id: u32) -> Result<Option<String>, MethodError> {
self.store
.get_value::<String>(CustomValueKey {
value: KeySerializer::new(std::mem::size_of::<u32>() * 2 + 1)
.write(u32::MAX)
.write(1u8)
.write(account_id)
.finalize(),
value: AccountKey::id_to_name(account_id),
})
.await
.map_err(|err| {
@ -333,3 +318,22 @@ impl JMAP {
}
}
}
pub struct AccountKey();
impl AccountKey {
pub fn name_to_id(name: &str) -> Vec<u8> {
KeySerializer::new(name.len() + std::mem::size_of::<u32>() + 1)
.write(u32::MAX)
.write(0u8)
.write(name)
.finalize()
}
pub fn id_to_name(id: u32) -> Vec<u8> {
KeySerializer::new(std::mem::size_of::<u32>() * 2 + 1)
.write(u32::MAX)
.write(1u8)
.write(id)
.finalize()
}
}

View file

@ -23,13 +23,11 @@
use std::{collections::HashMap, sync::atomic::AtomicU32};
use http_body_util::BodyExt;
use hyper::{header::CONTENT_TYPE, StatusCode};
use serde::{Deserialize, Serialize};
use crate::api::{
http::{fetch_body, ToHttpResponse},
HtmlResponse, HttpRequest, HttpResponse,
};
use crate::api::{http::ToHttpResponse, HtmlResponse, HttpRequest, HttpResponse};
pub mod device_auth;
pub mod token;
@ -221,7 +219,7 @@ pub async fn parse_form_data(
.get(CONTENT_TYPE)
.and_then(|h| h.to_str().ok())
.and_then(|val| val.parse::<mime::Mime>().ok()),
fetch_body(req, 2048).await,
fetch_body(req).await,
) {
(Some(content_type), Some(body)) => {
let mut fields = HashMap::new();
@ -245,3 +243,17 @@ pub async fn parse_form_data(
.into_http_response()),
}
}
pub async fn fetch_body(req: &mut HttpRequest) -> Option<Vec<u8>> {
let mut bytes = Vec::with_capacity(1024);
while let Some(Ok(frame)) = req.frame().await {
if let Some(data) = frame.data_ref() {
if bytes.len() + data.len() <= 2048 {
bytes.extend_from_slice(data);
} else {
return None;
}
}
}
bytes.into()
}

View file

@ -51,7 +51,7 @@ use store::{
use crate::{
auth::{acl::EffectiveAcl, AccessToken},
JMAP, SUPERUSER_ID,
JMAP,
};
use super::{INBOX_ID, TRASH_ID};
@ -755,7 +755,7 @@ impl JMAP {
}
#[cfg(feature = "test_mode")]
if mailbox_ids.is_empty() && account_id == SUPERUSER_ID {
if mailbox_ids.is_empty() && account_id == crate::SUPERUSER_ID {
return Ok(mailbox_ids);
}

View file

@ -53,15 +53,14 @@ impl JMAP {
.map_err(|_| MethodError::ServerPartialFail)?
{
let account_id = self.get_account_id(&principal.name).await?;
if is_set {
if is_set || result_set.results.contains(account_id) {
result_set.results =
RoaringBitmap::from_sorted_iter([account_id]).unwrap();
} else if result_set.results.contains(account_id) {
result_set.results.remove(account_id);
} else {
result_set.results = RoaringBitmap::new();
}
} else {
result_set.results = RoaringBitmap::new();
}
is_set = false;
}
@ -77,6 +76,7 @@ impl JMAP {
}
if is_set {
result_set.results = ids;
is_set = false;
} else {
result_set.results &= ids;
}

View file

@ -21,10 +21,12 @@
* for more details.
*/
use std::{sync::Arc, time::Duration};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use chrono::{Datelike, TimeZone, Timelike};
use store::write::now;
use tokio::sync::mpsc;
use utils::{config::Config, failed, map::ttl_dashmap::TtlMap, UnwrapFailure};
@ -75,7 +77,7 @@ pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Recei
purge_cache.time_to_next(),
];
let mut tasks_to_run = [false, false, false];
let start_time = now();
let start_time = Instant::now();
match tokio::time::timeout(time_to_next.iter().min().copied().unwrap(), rx.recv()).await
{
@ -96,9 +98,9 @@ pub fn spawn_housekeeper(core: Arc<JMAP>, settings: &Config, mut rx: mpsc::Recei
}
// Check which tasks are due for execution
let now = now();
let now = Instant::now();
for (pos, time_to_next) in time_to_next.into_iter().enumerate() {
if start_time + time_to_next.as_secs() <= now {
if start_time + time_to_next <= now {
tasks_to_run[pos] = true;
}
}

View file

@ -108,7 +108,7 @@ impl ReadTransaction<'_> {
}
Ok(sorted_results)
} else {
} else if comparators.len() > 1 {
//TODO improve this algorithm, avoid re-sorting in memory.
let mut sorted_ids = AHashMap::with_capacity(paginate.limit);
@ -208,6 +208,33 @@ impl ReadTransaction<'_> {
}
}
Ok(paginate.build())
} else {
let mut seen_prefixes = AHashSet::new();
for document_id in result_set.results {
// Obtain document prefixId
let prefix_id = if let Some(prefix_key) = &paginate.prefix_key {
if let Some(prefix_id) = self
.get_value(prefix_key.with_document_id(document_id))
.await?
{
if paginate.prefix_unique && !seen_prefixes.insert(prefix_id) {
continue;
}
prefix_id
} else {
// Document no longer exists?
continue;
}
} else {
0
};
// Add document to results
if !paginate.add(prefix_id, document_id) {
break;
}
}
Ok(paginate.build())
}
}

View file

@ -3,7 +3,7 @@ hostname = "test.example.org"
[server.listener.jmap]
bind = ["127.0.0.1:9990"]
url = "https://127.0.0.1:8899"
url = "https://127.0.0.1:9990"
protocol = "jmap"
max-connections = 8192
@ -97,8 +97,8 @@ type = "local"
path = "/tmp/stalwart-test"
[certificate.default]
cert = "file://./tests/resources/tls_cert.pem"
private-key = "file://./tests/resources/tls_privatekey.pem"
cert = "file://../../tests/resources/tls_cert.pem"
private-key = "file://../../tests/resources/tls_privatekey.pem"
[jmap]
directory = "local"
@ -146,7 +146,8 @@ domains = ["example.org"]
[[directory."local".users]]
name = "admin"
description = "Superadmin"
secret = "donotuse"
secret = "secret"
member-of = ["superusers"]
[[directory."local".users]]
name = "john"
@ -180,6 +181,10 @@ description = "Sales Team"
name = "support"
description = "Support Team"
[[directory."local".groups]]
name = "superusers"
description = "Superusers"
[oauth]
key = "parerga_und_paralipomena"
max-auth-attempts = 1